This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_repartition in repository https://gitbox.apache.org/repos/asf/doris.git
commit c0380ad4594c74e444d707e43537a15b98c61bf1 Author: yiguolei <[email protected]> AuthorDate: Wed Mar 4 13:11:14 2026 +0800 change spill file to shared ptr --- be/src/pipeline/exec/multi_cast_data_streamer.cpp | 2 +- be/src/pipeline/exec/multi_cast_data_streamer.h | 2 +- .../exec/partitioned_aggregation_sink_operator.cpp | 9 ++++++ .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../partitioned_aggregation_source_operator.cpp | 5 ++++ .../exec/partitioned_aggregation_source_operator.h | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 20 +++++++++++-- .../exec/partitioned_hash_join_probe_operator.h | 6 ++-- .../exec/partitioned_hash_join_sink_operator.cpp | 9 ++++++ .../exec/partitioned_hash_join_sink_operator.h | 2 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 5 ++++ be/src/pipeline/exec/spill_sort_sink_operator.h | 2 +- .../pipeline/exec/spill_sort_source_operator.cpp | 13 +++++++- be/src/pipeline/exec/spill_sort_source_operator.h | 2 +- be/src/vec/spill/spill_file.cpp | 9 +++--- be/src/vec/spill/spill_file.h | 12 ++++---- be/src/vec/spill/spill_file_reader.h | 2 +- be/src/vec/spill/spill_file_writer.cpp | 35 +++++++++++++++------- be/src/vec/spill/spill_file_writer.h | 10 +++---- be/src/vec/spill/spill_repartitioner.cpp | 2 +- be/src/vec/spill/spill_repartitioner.h | 6 ++-- 21 files changed, 113 insertions(+), 44 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index b98763746fe..9006a236d28 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -252,7 +252,7 @@ Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, auto spill_func = [state, blocks = std::move(blocks), spill_file = std::move(spill_file), sink_profile]() mutable { const auto blocks_count = blocks.size(); - vectorized::SpillFileWriterUPtr writer; + vectorized::SpillFileWriterSPtr writer; RETURN_IF_ERROR(spill_file->create_writer(state, sink_profile, writer)); for (auto& block : blocks) { if (state->is_cancelled()) break; diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 5e8526c8dda..6c123148319 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -45,7 +45,7 @@ struct MultiCastBlock { }; struct SpillingReader { - vectorized::SpillFileReaderUPtr reader; + vectorized::SpillFileReaderSPtr reader; vectorized::SpillFileSPtr spill_file; int64_t block_offset {0}; bool all_data_read {false}; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 25d49935fde..ac729bf911c 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -77,6 +77,15 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat if (Base::_closed) { return Status::OK(); } + + for (auto& writer : _spill_writers) { + if (writer) { + RETURN_IF_ERROR(writer->close()); + writer.reset(); + } + } + _spill_writers.clear(); + return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 6ac9759ebfe..5538c296562 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -97,7 +97,7 @@ public: RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr; - std::vector<vectorized::SpillFileWriterUPtr> _spill_writers; + std::vector<vectorized::SpillFileWriterSPtr> _spill_writers; std::atomic<bool> _eos = false; }; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index fd3f6c284be..fe2ebb2df78 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -101,6 +101,11 @@ Status PartitionedAggLocalState::close(RuntimeState* state) { return Status::OK(); } + if (_current_reader) { + RETURN_IF_ERROR(_current_reader->close()); + _current_reader.reset(); + } + // Clean up partition queue resources. for (auto& partition : _partition_queue) { if (partition.spill_file) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index ff40cd411d4..518e59b5ac5 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -116,7 +116,7 @@ private: SpillRepartitioner _repartitioner; // Persistent reader for _recover_blocks_from_partition (survives across yield calls) - vectorized::SpillFileReaderUPtr _current_reader; + vectorized::SpillFileReaderSPtr _current_reader; }; class AggSourceOperatorX; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 49156b83c9e..b24f86846d5 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -186,6 +186,24 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } + + for (auto& writer : _probe_writers) { + if (writer) { + RETURN_IF_ERROR(writer->close()); + writer.reset(); + } + } + _probe_writers.clear(); + + if (_current_build_reader) { + RETURN_IF_ERROR(_current_build_reader->close()); + _current_build_reader.reset(); + } + if (_current_probe_reader) { + RETURN_IF_ERROR(_current_probe_reader->close()); + _current_probe_reader.reset(); + } + // Clean up any remaining spill partition queue entries for (auto& entry : _spill_partition_queue) { if (entry.build_file) { @@ -203,8 +221,6 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(_current_partition.probe_file); } _current_partition = JoinSpillPartitionInfo {}; - _current_build_reader.reset(); - _current_probe_reader.reset(); _queue_probe_blocks.clear(); RETURN_IF_ERROR(PipelineXSpillLocalState::close(state)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index a375fe1d1f7..bbaccf5906b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -158,14 +158,14 @@ private: std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks; std::vector<vectorized::SpillFileSPtr> _probe_spilling_groups; - std::vector<vectorized::SpillFileWriterUPtr> _probe_writers; + std::vector<vectorized::SpillFileWriterSPtr> _probe_writers; std::unique_ptr<vectorized::PartitionerBase> _partitioner; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; // Persistent readers for recovery across scheduling slices - vectorized::SpillFileReaderUPtr _current_build_reader; - vectorized::SpillFileReaderUPtr _current_probe_reader; + vectorized::SpillFileReaderSPtr _current_build_reader; + vectorized::SpillFileReaderSPtr _current_probe_reader; // ---- Spill partition queue state ---- // Whether _spill_partition_queue has been initialized from spilled build groups + diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 5f8526b4317..cac453c8b69 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -84,6 +84,15 @@ Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->_inner_runtime_state.get(), exec_status)); } + + for (auto& writer : _build_writers) { + if (writer) { + RETURN_IF_ERROR(writer->close()); + writer.reset(); + } + } + _build_writers.clear(); + return PipelineXSpillSinkLocalState::close(state, exec_status); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 61f275c7156..12d4810bbdd 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -94,7 +94,7 @@ protected: RuntimeProfile::Counter* _in_mem_rows_counter = nullptr; RuntimeProfile::Counter* _memory_usage_reserved = nullptr; - std::vector<vectorized::SpillFileWriterUPtr> _build_writers; + std::vector<vectorized::SpillFileWriterSPtr> _build_writers; }; class PartitionedHashJoinSinkOperatorX diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 87aa89021b4..5ce2130eff4 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -66,6 +66,11 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { #undef UPDATE_PROFILE Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { + if (_spilling_writer) { + RETURN_IF_ERROR(_spilling_writer->close()); + _spilling_writer.reset(); + } + _spilling_file.reset(); return Base::close(state, execsink_status); } diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 19f9a0c85c9..36710e14582 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -62,7 +62,7 @@ private: RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr; vectorized::SpillFileSPtr _spilling_file; - vectorized::SpillFileWriterUPtr _spilling_writer; + vectorized::SpillFileWriterSPtr _spilling_writer; std::atomic<bool> _eos = false; }; diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 06150cce856..9b4eb4b0d0a 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -64,6 +64,17 @@ Status SpillSortLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } + + for (auto& reader : _current_merging_readers) { + if (reader) { + RETURN_IF_ERROR(reader->close()); + reader.reset(); + } + } + _current_merging_readers.clear(); + _current_merging_files.clear(); + _merger.reset(); + return Base::close(state); } @@ -100,7 +111,7 @@ Status SpillSortLocalState::execute_merge_sort_spill_files(RuntimeState* state) ExecEnv::GetInstance()->spill_file_mgr()->next_id()); RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, tmp_file)); - vectorized::SpillFileWriterUPtr tmp_writer; + vectorized::SpillFileWriterSPtr tmp_writer; RETURN_IF_ERROR(tmp_file->create_writer(state, operator_profile(), tmp_writer)); _shared_state->sorted_spill_groups.emplace_back(tmp_file); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index 3b2943033a2..fe13c2aad20 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -63,7 +63,7 @@ protected: std::vector<vectorized::SpillFileSPtr> _current_merging_files; /// Readers held alive during merge; one per SpillFile, reads parts sequentially. - std::vector<vectorized::SpillFileReaderUPtr> _current_merging_readers; + std::vector<vectorized::SpillFileReaderSPtr> _current_merging_readers; std::unique_ptr<vectorized::VSortedRunMerger> _merger; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; diff --git a/be/src/vec/spill/spill_file.cpp b/be/src/vec/spill/spill_file.cpp index 7be97d738c3..81d8d322478 100644 --- a/be/src/vec/spill/spill_file.cpp +++ b/be/src/vec/spill/spill_file.cpp @@ -65,17 +65,18 @@ void SpillFile::gc() { } Status SpillFile::create_writer(RuntimeState* state, RuntimeProfile* profile, - SpillFileWriterUPtr& writer) { - writer = std::make_unique<SpillFileWriter>(this, state, profile, _data_dir, _spill_dir); + SpillFileWriterSPtr& writer) { + writer = std::make_shared<SpillFileWriter>(shared_from_this(), state, profile, _data_dir, + _spill_dir); // record active writer _active_writer = writer.get(); return Status::OK(); } -SpillFileReaderUPtr SpillFile::create_reader(RuntimeState* state, RuntimeProfile* profile) const { +SpillFileReaderSPtr SpillFile::create_reader(RuntimeState* state, RuntimeProfile* profile) const { // It's a programming error to create a reader while a writer is still active. DCHECK(_active_writer == nullptr) << "create_reader() called while writer still active"; - return std::make_unique<SpillFileReader>(state, profile, _spill_dir, _part_count); + return std::make_shared<SpillFileReader>(state, profile, _spill_dir, _part_count); } void SpillFile::finish_writing(int64_t total_written_bytes, size_t part_count) { diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h index 2d938d98fdb..362b522f10c 100644 --- a/be/src/vec/spill/spill_file.h +++ b/be/src/vec/spill/spill_file.h @@ -32,8 +32,8 @@ class Block; class SpillDataDir; class SpillFileWriter; class SpillFileReader; -using SpillFileWriterUPtr = std::unique_ptr<SpillFileWriter>; -using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>; +using SpillFileWriterSPtr = std::shared_ptr<SpillFileWriter>; +using SpillFileReaderSPtr = std::shared_ptr<SpillFileReader>; /// SpillFile represents a logical spill file that may consist of multiple /// physical "part" files on disk. Parts are managed automatically by @@ -46,7 +46,7 @@ using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>; /// +-- ... /// /// Writing workflow: -/// SpillFileWriterUPtr writer; +/// SpillFileWriterSPtr writer; /// RETURN_IF_ERROR(spill_file->create_writer(state, profile, writer)); /// RETURN_IF_ERROR(writer->write_block(state, block)); // auto-rotates parts /// RETURN_IF_ERROR(writer->close()); // finalizes all parts @@ -55,7 +55,7 @@ using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>; /// auto reader = spill_file->create_reader(state, profile); /// RETURN_IF_ERROR(reader->open()); /// while (!eos) { RETURN_IF_ERROR(reader->read(&block, &eos)); } -class SpillFile { +class SpillFile : public std::enable_shared_from_this<SpillFile> { public: // to avoid too many small file writes static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 512 * 1024; @@ -80,11 +80,11 @@ public: /// Create a SpillFileWriter that automatically manages multi-part rotation. /// Only one writer should exist per SpillFile at a time. /// Part size threshold is read from config::spill_file_part_size_bytes. - Status create_writer(RuntimeState* state, RuntimeProfile* profile, SpillFileWriterUPtr& writer); + Status create_writer(RuntimeState* state, RuntimeProfile* profile, SpillFileWriterSPtr& writer); /// Create a SpillFileReader that reads sequentially across all parts. /// The caller should call reader->open() before reading. - SpillFileReaderUPtr create_reader(RuntimeState* state, RuntimeProfile* profile) const; + SpillFileReaderSPtr create_reader(RuntimeState* state, RuntimeProfile* profile) const; private: friend class SpillFileWriter; diff --git a/be/src/vec/spill/spill_file_reader.h b/be/src/vec/spill/spill_file_reader.h index e76bf8355db..c77b58a3290 100644 --- a/be/src/vec/spill/spill_file_reader.h +++ b/be/src/vec/spill/spill_file_reader.h @@ -107,7 +107,7 @@ private: std::shared_ptr<ResourceContext> _resource_ctx = nullptr; }; -using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>; +using SpillFileReaderSPtr = std::shared_ptr<SpillFileReader>; } // namespace doris::vectorized #include "common/compile_check_end.h" diff --git a/be/src/vec/spill/spill_file_writer.cpp b/be/src/vec/spill/spill_file_writer.cpp index 3413142f9c8..d0ef50dbe60 100644 --- a/be/src/vec/spill/spill_file_writer.cpp +++ b/be/src/vec/spill/spill_file_writer.cpp @@ -32,10 +32,10 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" -SpillFileWriter::SpillFileWriter(SpillFile* spill_file, RuntimeState* state, +SpillFileWriter::SpillFileWriter(const std::shared_ptr<SpillFile>& spill_file, RuntimeState* state, RuntimeProfile* profile, SpillDataDir* data_dir, const std::string& spill_dir) - : _spill_file(spill_file), + : _spill_file_wptr(spill_file), _data_dir(data_dir), _spill_dir(spill_dir), _max_part_size(config::spill_file_part_size_bytes), @@ -46,8 +46,9 @@ SpillFileWriter::SpillFileWriter(SpillFile* spill_file, RuntimeState* state, _memory_used_counter = common_profile->get_counter("MemoryUsage"); // Register this writer as the active writer for the SpillFile. - if (_spill_file) { - _spill_file->_active_writer = this; + auto spill_file_locked = _spill_file_wptr.lock(); + if (spill_file_locked) { + spill_file_locked->_active_writer = this; } // Custom (spill-specific) counters @@ -63,7 +64,14 @@ SpillFileWriter::SpillFileWriter(SpillFile* spill_file, RuntimeState* state, } SpillFileWriter::~SpillFileWriter() { - DCHECK(_closed) << "SpillFileWriter destroyed without close(), possible memory leak"; + if (_closed) { + return; + } + Status st = close(); + if (!st.ok()) { + LOG(WARNING) << "SpillFileWriter::~SpillFileWriter() failed: " << st.to_string() + << ", spill_dir=" << _spill_dir; + } } Status SpillFileWriter::_open_next_part() { @@ -158,13 +166,18 @@ Status SpillFileWriter::close() { RETURN_IF_ERROR(_close_current_part()); - if (!_spill_file || _spill_file->_active_writer != this) { - return Status::Error<INTERNAL_ERROR>( - "SpillFileWriter close() called but not registered as active writer, possible " - "double close or logic error"); + // Use weak_ptr lock to safely access SpillFile during close. + // If the SpillFile has already been destroyed, we skip the callback. + auto spill_file = _spill_file_wptr.lock(); + if (spill_file) { + if (spill_file->_active_writer != this) { + return Status::Error<INTERNAL_ERROR>( + "SpillFileWriter close() called but not registered as active writer, possible " + "double close or logic error"); + } + spill_file->finish_writing(_total_written_bytes, _total_parts); + spill_file->_active_writer = nullptr; } - _spill_file->finish_writing(_total_written_bytes, _total_parts); - _spill_file->_active_writer = nullptr; return Status::OK(); } diff --git a/be/src/vec/spill/spill_file_writer.h b/be/src/vec/spill/spill_file_writer.h index 2e45c8ae32c..8492d11ba13 100644 --- a/be/src/vec/spill/spill_file_writer.h +++ b/be/src/vec/spill/spill_file_writer.h @@ -38,7 +38,7 @@ class SpillFile; /// (config::spill_file_part_size_bytes). /// /// Usage: -/// SpillFileWriterUPtr writer; +/// SpillFileWriterSPtr writer; /// RETURN_IF_ERROR(spill_file->create_writer(state, profile, writer)); /// RETURN_IF_ERROR(writer->write_block(state, block)); /// RETURN_IF_ERROR(writer->close()); @@ -48,8 +48,8 @@ class SpillFile; /// directory. class SpillFileWriter { public: - SpillFileWriter(SpillFile* spill_file, RuntimeState* state, RuntimeProfile* profile, - SpillDataDir* data_dir, const std::string& spill_dir); + SpillFileWriter(const std::shared_ptr<SpillFile>& spill_file, RuntimeState* state, + RuntimeProfile* profile, SpillDataDir* data_dir, const std::string& spill_dir); ~SpillFileWriter(); @@ -75,7 +75,7 @@ private: Status _write_internal(const Block& block); // ── Back-reference ── - SpillFile* _spill_file; // non-owning; for close() callback + std::weak_ptr<SpillFile> _spill_file_wptr; // weak ref; use lock() in close() // ── Configuration ── SpillDataDir* _data_dir = nullptr; @@ -109,7 +109,7 @@ private: std::shared_ptr<ResourceContext> _resource_ctx = nullptr; }; -using SpillFileWriterUPtr = std::unique_ptr<SpillFileWriter>; +using SpillFileWriterSPtr = std::shared_ptr<SpillFileWriter>; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/spill/spill_repartitioner.cpp b/be/src/vec/spill/spill_repartitioner.cpp index 452ce9a8c71..780927e117c 100644 --- a/be/src/vec/spill/spill_repartitioner.cpp +++ b/be/src/vec/spill/spill_repartitioner.cpp @@ -134,7 +134,7 @@ Status SpillRepartitioner::repartition(RuntimeState* state, return Status::OK(); } -Status SpillRepartitioner::repartition(RuntimeState* state, vectorized::SpillFileReaderUPtr& reader, +Status SpillRepartitioner::repartition(RuntimeState* state, vectorized::SpillFileReaderSPtr& reader, bool* done) { DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first"; DCHECK(reader != nullptr) << "reader must not be null"; diff --git a/be/src/vec/spill/spill_repartitioner.h b/be/src/vec/spill/spill_repartitioner.h index 0effe325f54..f6894dc3637 100644 --- a/be/src/vec/spill/spill_repartitioner.h +++ b/be/src/vec/spill/spill_repartitioner.h @@ -107,7 +107,7 @@ public: /// and wants to repartition only the remaining data without re-reading /// from the beginning. Ownership of the reader is transferred on completion. /// Call repeatedly until done == true. - Status repartition(RuntimeState* state, vectorized::SpillFileReaderUPtr& reader, bool* done); + Status repartition(RuntimeState* state, vectorized::SpillFileReaderSPtr& reader, bool* done); /// Route a single in-memory block into output files via persistent writers. Status route_block(RuntimeState* state, vectorized::Block& block); @@ -158,11 +158,11 @@ private: // ── Persistent state across repartition/route_block calls ────── // Output writers (one per partition), created by setup_output() - std::vector<vectorized::SpillFileWriterUPtr> _output_writers; + std::vector<vectorized::SpillFileWriterSPtr> _output_writers; // Pointer to caller's output SpillFiles vector (for finalize) std::vector<vectorized::SpillFileSPtr>* _output_spill_files = nullptr; // Input reader for repartition(), persists across yield calls - vectorized::SpillFileReaderUPtr _input_reader; + vectorized::SpillFileReaderSPtr _input_reader; vectorized::SpillFileSPtr _current_input_file; }; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
