This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b4f471050a7 [fix](merge-clod) fix file not found when load for mow 
table (#32144)
b4f471050a7 is described below

commit b4f471050a76983f972ec510d4dc0fd4d0143ff3
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Wed Mar 13 14:23:56 2024 +0800

    [fix](merge-clod) fix file not found when load for mow table (#32144)
---
 be/src/olap/rowset/beta_rowset_writer.cpp |  5 +++++
 be/src/olap/rowset/segment_creator.cpp    | 13 ++++++++++---
 be/src/olap/rowset/segment_creator.h      |  9 ++++++++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 6fadbf0910d..609a06e40bb 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -804,6 +804,11 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t 
segment_id, const SegmentStati
         update_rowset_schema(flush_schema);
     }
     if (_context.mow_context != nullptr) {
+        // ensure that the segment file writing is complete
+        auto* file_writer = _segment_creator.get_file_writer(segment_id);
+        if (file_writer) {
+            RETURN_IF_ERROR(file_writer->close());
+        }
         RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
     }
     return Status::OK();
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index 8f2553ade59..9a62055ac3f 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -151,7 +151,7 @@ Status 
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
 
 Status SegmentFlusher::close() {
     std::lock_guard<SpinLock> l(_lock);
-    for (auto& file_writer : _file_writers) {
+    for (auto& [segment_id, file_writer] : _file_writers) {
         Status status = file_writer->close();
         if (!status.ok()) {
             LOG(WARNING) << "failed to close file writer, path=" << 
file_writer->path()
@@ -205,7 +205,7 @@ Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
             _context->max_rows_per_segment, writer_options, 
_context->mow_context));
     {
         std::lock_guard<SpinLock> l(_lock);
-        _file_writers.push_back(std::move(file_writer));
+        _file_writers.emplace(segment_id, std::move(file_writer));
     }
     auto s = writer->init();
     if (!s.ok()) {
@@ -236,7 +236,7 @@ Status SegmentFlusher::_create_segment_writer(
             _context->max_rows_per_segment, writer_options, 
_context->mow_context));
     {
         std::lock_guard<SpinLock> l(_lock);
-        _file_writers.push_back(std::move(file_writer));
+        _file_writers.emplace(segment_id, std::move(file_writer));
     }
     auto s = writer->init();
     if (!s.ok()) {
@@ -345,6 +345,13 @@ Status 
SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& wr
     return Status::OK();
 }
 
+io::FileWriter* SegmentFlusher::get_file_writer(int32_t segment_id) {
+    if (!_file_writers.contains(segment_id)) {
+        return nullptr;
+    }
+    return _file_writers[segment_id].get();
+}
+
 SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
                                std::unique_ptr<segment_v2::SegmentWriter>& 
segment_writer)
         : _flusher(flusher), _writer(std::move(segment_writer)) {};
diff --git a/be/src/olap/rowset/segment_creator.h 
b/be/src/olap/rowset/segment_creator.h
index 668c75e47b3..fe439d3bc7a 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -20,6 +20,7 @@
 #include <gen_cpp/olap_file.pb.h>
 
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "common/status.h"
@@ -102,6 +103,8 @@ public:
 
     int64_t num_rows_filtered() const { return _num_rows_filtered; }
 
+    io::FileWriter* get_file_writer(int32_t segment_id);
+
     Status close();
 
 public:
@@ -153,7 +156,7 @@ private:
     RowsetWriterContext* _context;
 
     mutable SpinLock _lock; // protect following vectors.
-    std::vector<io::FileWriterPtr> _file_writers;
+    std::unordered_map<int32_t, io::FileWriterPtr> _file_writers;
 
     // written rows by add_block/add_row
     std::atomic<int64_t> _num_rows_written = 0;
@@ -196,6 +199,10 @@ public:
 
     Status close();
 
+    io::FileWriter* get_file_writer(int32_t segment_id) {
+        return _segment_flusher.get_file_writer(segment_id);
+    }
+
 private:
     std::atomic<int32_t> _next_segment_id = 0;
     SegmentFlusher _segment_flusher;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to