This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b4f471050a7 [fix](merge-clod) fix file not found when load for mow table (#32144) b4f471050a7 is described below commit b4f471050a76983f972ec510d4dc0fd4d0143ff3 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Wed Mar 13 14:23:56 2024 +0800 [fix](merge-clod) fix file not found when load for mow table (#32144) --- be/src/olap/rowset/beta_rowset_writer.cpp | 5 +++++ be/src/olap/rowset/segment_creator.cpp | 13 ++++++++++--- be/src/olap/rowset/segment_creator.h | 9 ++++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 6fadbf0910d..609a06e40bb 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -804,6 +804,11 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati update_rowset_schema(flush_schema); } if (_context.mow_context != nullptr) { + // ensure that the segment file writing is complete + auto* file_writer = _segment_creator.get_file_writer(segment_id); + if (file_writer) { + RETURN_IF_ERROR(file_writer->close()); + } RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 8f2553ade59..9a62055ac3f 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -151,7 +151,7 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, Status SegmentFlusher::close() { std::lock_guard<SpinLock> l(_lock); - for (auto& file_writer : _file_writers) { + for (auto& [segment_id, file_writer] : _file_writers) { Status status = file_writer->close(); if (!status.ok()) { LOG(WARNING) << "failed to close file writer, path=" << file_writer->path() @@ -205,7 +205,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen _context->max_rows_per_segment, writer_options, _context->mow_context)); { std::lock_guard<SpinLock> l(_lock); - _file_writers.push_back(std::move(file_writer)); + _file_writers.emplace(segment_id, std::move(file_writer)); } auto s = writer->init(); if (!s.ok()) { @@ -236,7 +236,7 @@ Status SegmentFlusher::_create_segment_writer( _context->max_rows_per_segment, writer_options, _context->mow_context)); { std::lock_guard<SpinLock> l(_lock); - _file_writers.push_back(std::move(file_writer)); + _file_writers.emplace(segment_id, std::move(file_writer)); } auto s = writer->init(); if (!s.ok()) { @@ -345,6 +345,13 @@ Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& wr return Status::OK(); } +io::FileWriter* SegmentFlusher::get_file_writer(int32_t segment_id) { + if (!_file_writers.contains(segment_id)) { + return nullptr; + } + return _file_writers[segment_id].get(); +} + SegmentFlusher::Writer::Writer(SegmentFlusher* flusher, std::unique_ptr<segment_v2::SegmentWriter>& segment_writer) : _flusher(flusher), _writer(std::move(segment_writer)) {}; diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 668c75e47b3..fe439d3bc7a 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -20,6 +20,7 @@ #include <gen_cpp/olap_file.pb.h> #include <string> +#include <unordered_map> #include <vector> #include "common/status.h" @@ -102,6 +103,8 @@ public: int64_t num_rows_filtered() const { return _num_rows_filtered; } + io::FileWriter* get_file_writer(int32_t segment_id); + Status close(); public: @@ -153,7 +156,7 @@ private: RowsetWriterContext* _context; mutable SpinLock _lock; // protect following vectors. - std::vector<io::FileWriterPtr> _file_writers; + std::unordered_map<int32_t, io::FileWriterPtr> _file_writers; // written rows by add_block/add_row std::atomic<int64_t> _num_rows_written = 0; @@ -196,6 +199,10 @@ public: Status close(); + io::FileWriter* get_file_writer(int32_t segment_id) { + return _segment_flusher.get_file_writer(segment_id); + } + private: std::atomic<int32_t> _next_segment_id = 0; SegmentFlusher _segment_flusher; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org