This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2013dcd0e9 [refactor](load) cleanup segment flush logic in beta rowset 
writer (#21635)
2013dcd0e9 is described below

commit 2013dcd0e96fb79493884dc933e9f2e6f941986d
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Tue Jul 18 18:17:57 2023 +0800

    [refactor](load) cleanup segment flush logic in beta rowset writer (#21635)
---
 be/src/olap/memtable_flush_executor.cpp   |   7 +-
 be/src/olap/rowset/beta_rowset_writer.cpp | 222 +++++++++++++-----------------
 be/src/olap/rowset/beta_rowset_writer.h   |  47 ++++---
 be/src/olap/rowset/rowset_writer.h        |  18 +--
 be/src/olap/rowset/segcompaction.cpp      |   2 +-
 be/src/olap/tablet.cpp                    |   3 +-
 6 files changed, 132 insertions(+), 167 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index cf478393af..70cd74e214 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -99,8 +99,11 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     int64_t duration_ns;
     SCOPED_RAW_TIMER(&duration_ns);
     std::unique_ptr<vectorized::Block> block = memtable->to_block();
-    
SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_rowset_writer->unfold_variant_column_and_flush_block(
-            block.get(), segment_id, memtable->flush_mem_tracker(), 
flush_size)));
+    {
+        SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
+        SKIP_MEMORY_CHECK(RETURN_IF_ERROR(
+                _rowset_writer->flush_memtable(block.get(), segment_id, 
flush_size)));
+    }
     _memtable_stat += memtable->stat();
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 
1000);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index d99c8e9ee4..4aa7815423 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -89,10 +89,10 @@ BetaRowsetWriter::~BetaRowsetWriter() {
         if (!fs) {
             return;
         }
-        auto max_segment_id = std::max(_num_segment.load(), 
_next_segment_id.load());
-        for (int i = 0; i < max_segment_id; ++i) {
-            std::string seg_path = BetaRowset::segment_file_path(
-                    _context.rowset_dir, _context.rowset_id, _segment_start_id 
+ i);
+        DCHECK_LE(_segment_start_id + _num_segment, _next_segment_id);
+        for (int i = _segment_start_id; i < _next_segment_id; ++i) {
+            std::string seg_path =
+                    BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, i);
             // Even if an error is encountered, these files that have not been 
cleaned up
             // will be cleaned up by the GC background. So here we only print 
the error
             // message when we encounter an error.
@@ -134,11 +134,9 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
         return Status::OK();
     }
     if (UNLIKELY(_segment_writer == nullptr)) {
-        FlushContext ctx;
-        ctx.block = block;
-        RETURN_IF_ERROR(_create_segment_writer(&_segment_writer, &ctx));
+        RETURN_IF_ERROR(_create_segment_writer(_segment_writer, 
allocate_segment_id()));
     }
-    return _add_block(block, &_segment_writer);
+    return _add_block(block, _segment_writer);
 }
 
 Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
@@ -446,46 +444,38 @@ Status 
BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
     return status;
 }
 
-Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block,
-                                       
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
-                                       size_t row_offset, size_t 
input_row_num) {
-    auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
+Status BetaRowsetWriter::_add_rows(const vectorized::Block* block,
+                                   std::unique_ptr<segment_v2::SegmentWriter>& 
segment_writer,
+                                   size_t row_offset, size_t input_row_num) {
+    auto s = segment_writer->append_block(block, row_offset, input_row_num);
     if (UNLIKELY(!s.ok())) {
         return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to append block: 
{}", s.to_string());
     }
+    _raw_num_rows_written += input_row_num;
     return Status::OK();
 }
 
 Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
-                                    
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
-                                    const FlushContext* flush_ctx) {
+                                    
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer) {
     size_t block_size_in_bytes = block->bytes();
     size_t block_row_num = block->rows();
     size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / 
block_row_num);
     size_t row_offset = 0;
 
-    if (flush_ctx != nullptr && flush_ctx->segment_id.has_value()) {
-        // the entire block (memtable) should be flushed into single segment
-        RETURN_IF_ERROR(_do_add_block(block, segment_writer, 0, 
block_row_num));
-        _raw_num_rows_written += block_row_num;
-        return Status::OK();
-    }
-
     do {
-        auto max_row_add = 
(*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
+        auto max_row_add = 
segment_writer->max_row_to_add(row_avg_size_in_bytes);
         if (UNLIKELY(max_row_add < 1)) {
             // no space for another single row, need flush now
             RETURN_IF_ERROR(_flush_segment_writer(segment_writer));
-            RETURN_IF_ERROR(_create_segment_writer(segment_writer, flush_ctx));
-            max_row_add = 
(*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
+            RETURN_IF_ERROR(_create_segment_writer(segment_writer, 
allocate_segment_id()));
+            max_row_add = 
segment_writer->max_row_to_add(row_avg_size_in_bytes);
             DCHECK(max_row_add > 0);
         }
         size_t input_row_num = std::min(block_row_num - row_offset, 
size_t(max_row_add));
-        RETURN_IF_ERROR(_do_add_block(block, segment_writer, row_offset, 
input_row_num));
+        RETURN_IF_ERROR(_add_rows(block, segment_writer, row_offset, 
input_row_num));
         row_offset += input_row_num;
     } while (row_offset < block_row_num);
 
-    _raw_num_rows_written += block_row_num;
     return Status::OK();
 }
 
@@ -515,52 +505,45 @@ Status 
BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row
 
 Status BetaRowsetWriter::flush() {
     if (_segment_writer != nullptr) {
-        RETURN_IF_ERROR(_flush_segment_writer(&_segment_writer));
+        RETURN_IF_ERROR(_flush_segment_writer(_segment_writer));
     }
     return Status::OK();
 }
 
-Status BetaRowsetWriter::unfold_variant_column_and_flush_block(
-        vectorized::Block* block, int32_t segment_id,
-        const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* 
flush_size) {
-    SCOPED_CONSUME_MEM_TRACKER(flush_mem_tracker);
-
+Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t 
segment_id,
+                                        int64_t* flush_size) {
     if (block->rows() == 0) {
         return Status::OK();
     }
 
-    FlushContext ctx;
-    ctx.block = block;
+    TabletSchemaSPtr flush_schema;
     if (_context.tablet_schema->is_dynamic_schema()) {
         // Unfold variant column
-        RETURN_IF_ERROR(_unfold_variant_column(*block, ctx.flush_schema));
+        RETURN_IF_ERROR(_unfold_variant_column(*block, flush_schema));
+    }
+    {
+        SCOPED_RAW_TIMER(&_segment_writer_ns);
+        RETURN_IF_ERROR(_flush_single_block(block, segment_id, flush_size, 
flush_schema));
     }
-    ctx.segment_id = std::optional<int32_t> {segment_id};
-    SCOPED_RAW_TIMER(&_segment_writer_ns);
-    RETURN_IF_ERROR(flush_single_block(block, flush_size, &ctx));
     RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
     RETURN_IF_ERROR(_segcompaction_if_necessary());
     return Status::OK();
 }
 
-Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block, 
int64* flush_size,
-                                            const FlushContext* ctx) {
+Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) {
     if (block->rows() == 0) {
         return Status::OK();
     }
+    return _flush_single_block(block, allocate_segment_id());
+}
 
+Status BetaRowsetWriter::_flush_single_block(const vectorized::Block* block, 
int32_t segment_id,
+                                             int64_t* flush_size, 
TabletSchemaSPtr flush_schema) {
     std::unique_ptr<segment_v2::SegmentWriter> writer;
-    RETURN_IF_ERROR(_create_segment_writer(&writer, ctx));
-    segment_v2::SegmentWriter* raw_writer = writer.get();
-    int32_t segment_id = writer->get_segment_id();
-    RETURN_IF_ERROR(_add_block(block, &writer, ctx));
-    // if segment_id is present in flush context,
-    // the entire memtable should be flushed into a single segment
-    if (ctx != nullptr && ctx->segment_id.has_value()) {
-        DCHECK_EQ(writer->get_segment_id(), segment_id);
-        DCHECK_EQ(writer.get(), raw_writer);
-    }
-    RETURN_IF_ERROR(_flush_segment_writer(&writer, flush_size));
+    bool no_compression = block->bytes() <= 
config::segment_compression_threshold_kb * 1024;
+    RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, 
flush_schema));
+    RETURN_IF_ERROR(_add_rows(block, writer, 0, block->rows()));
+    RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
     return Status::OK();
 }
 
@@ -600,7 +583,7 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const 
RowsetMetaSharedPtr& spec_r
 
 RowsetSharedPtr BetaRowsetWriter::build() {
     // make sure all segments are flushed
-    DCHECK_EQ(_num_segment, _next_segment_id);
+    DCHECK_EQ(_segment_start_id + _num_segment, _next_segment_id);
     // TODO(lingbin): move to more better place, or in a CreateBlockBatch?
     for (auto& file_writer : _file_writers) {
         Status status = file_writer->close();
@@ -762,12 +745,12 @@ RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
     return rowset;
 }
 
-Status BetaRowsetWriter::_create_file_writer(std::string path, 
io::FileWriterPtr* file_writer) {
+Status BetaRowsetWriter::_create_file_writer(std::string path, 
io::FileWriterPtr& file_writer) {
     auto fs = _rowset_meta->fs();
     if (!fs) {
         return Status::Error<INIT_FAILED>("get fs failed");
     }
-    Status st = fs->create_file(path, file_writer);
+    Status st = fs->create_file(path, &file_writer);
     if (!st.ok()) {
         LOG(WARNING) << "failed to create writable file. path=" << path << ", 
err: " << st;
         return st;
@@ -777,85 +760,71 @@ Status BetaRowsetWriter::_create_file_writer(std::string 
path, io::FileWriterPtr
     return Status::OK();
 }
 
-Status BetaRowsetWriter::create_file_writer(uint32_t segment_id, 
io::FileWriterPtr* file_writer) {
+Status BetaRowsetWriter::create_file_writer(uint32_t segment_id, 
io::FileWriterPtr& file_writer) {
     std::string path;
     path = BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, segment_id);
     return _create_file_writer(path, file_writer);
 }
 
-Status BetaRowsetWriter::_create_file_writer(uint32_t begin, uint32_t end,
-                                             io::FileWriterPtr* file_writer) {
-    std::string path;
-    path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, 
_context.rowset_id,
-                                                       begin, end);
-    return _create_file_writer(path, file_writer);
+Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
+        std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end) {
+    DCHECK(begin >= 0 && end >= 0);
+    std::string path = 
BetaRowset::local_segment_path_segcompacted(_context.rowset_dir,
+                                                                   
_context.rowset_id, begin, end);
+    io::FileWriterPtr file_writer;
+    RETURN_IF_ERROR(_create_file_writer(path, file_writer));
+
+    segment_v2::SegmentWriterOptions writer_options;
+    writer_options.enable_unique_key_merge_on_write = 
_context.enable_unique_key_merge_on_write;
+    writer_options.rowset_ctx = &_context;
+    writer_options.write_type = _context.write_type;
+    writer_options.write_type = DataWriteType::TYPE_COMPACTION;
+
+    writer->reset(new segment_v2::SegmentWriter(file_writer.get(), 
_num_segcompacted,
+                                                _context.tablet_schema, 
_context.tablet,
+                                                _context.data_dir, 
_context.max_rows_per_segment,
+                                                writer_options, 
_context.mow_context));
+    if (_segcompaction_worker.get_file_writer() != nullptr) {
+        _segcompaction_worker.get_file_writer()->close();
+    }
+    _segcompaction_worker.get_file_writer().reset(file_writer.release());
+
+    return Status::OK();
 }
 
-Status BetaRowsetWriter::_do_create_segment_writer(
-        std::unique_ptr<segment_v2::SegmentWriter>* writer, bool 
is_segcompaction, int64_t begin,
-        int64_t end, const FlushContext* flush_ctx) {
-    Status st;
-    std::string path;
-    int32_t segment_id = 0;
+Status 
BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
 writer,
+                                                int32_t segment_id, bool 
no_compression,
+                                                TabletSchemaSPtr flush_schema) 
{
+    RETURN_IF_ERROR(_check_segment_number_limit());
     io::FileWriterPtr file_writer;
-    if (is_segcompaction) {
-        DCHECK(begin >= 0 && end >= 0);
-        st = _create_file_writer(begin, end, &file_writer);
-    } else {
-        int32_t segid_offset = (flush_ctx != nullptr && 
flush_ctx->segment_id.has_value())
-                                       ? flush_ctx->segment_id.value()
-                                       : allocate_segment_id();
-        segment_id = segid_offset + _segment_start_id;
-        st = create_file_writer(segment_id, &file_writer);
-    }
-    if (!st.ok()) {
-        return st;
-    }
+    RETURN_IF_ERROR(create_file_writer(segment_id, file_writer));
 
     segment_v2::SegmentWriterOptions writer_options;
     writer_options.enable_unique_key_merge_on_write = 
_context.enable_unique_key_merge_on_write;
     writer_options.rowset_ctx = &_context;
     writer_options.write_type = _context.write_type;
-    if (is_segcompaction) {
-        writer_options.write_type = DataWriteType::TYPE_COMPACTION;
+    if (no_compression) {
+        writer_options.compression_type = NO_COMPRESSION;
     }
 
-    if (is_segcompaction) {
-        writer->reset(new segment_v2::SegmentWriter(
-                file_writer.get(), _num_segcompacted, _context.tablet_schema, 
_context.tablet,
-                _context.data_dir, _context.max_rows_per_segment, 
writer_options,
-                _context.mow_context));
-        if (_segcompaction_worker.get_file_writer() != nullptr) {
-            _segcompaction_worker.get_file_writer()->close();
-        }
-        _segcompaction_worker.get_file_writer().reset(file_writer.release());
-    } else {
-        const auto& tablet_schema = flush_ctx && flush_ctx->flush_schema ? 
flush_ctx->flush_schema
-                                                                         : 
_context.tablet_schema;
-        if (flush_ctx && flush_ctx->block &&
-            flush_ctx->block->bytes() <= 
config::segment_compression_threshold_kb * 1024) {
-            writer_options.compression_type = NO_COMPRESSION;
-        }
-        writer->reset(new segment_v2::SegmentWriter(
-                file_writer.get(), segment_id, tablet_schema, _context.tablet, 
_context.data_dir,
-                _context.max_rows_per_segment, writer_options, 
_context.mow_context));
-        {
-            std::lock_guard<SpinLock> l(_lock);
-            _file_writers.push_back(std::move(file_writer));
-        }
-        auto s = (*writer)->init();
-        if (!s.ok()) {
-            LOG(WARNING) << "failed to init segment writer: " << s.to_string();
-            writer->reset(nullptr);
-            return s;
-        }
+    const auto& tablet_schema = flush_schema ? flush_schema : 
_context.tablet_schema;
+    writer.reset(new segment_v2::SegmentWriter(
+            file_writer.get(), segment_id, tablet_schema, _context.tablet, 
_context.data_dir,
+            _context.max_rows_per_segment, writer_options, 
_context.mow_context));
+    {
+        std::lock_guard<SpinLock> l(_lock);
+        _file_writers.push_back(std::move(file_writer));
+    }
+    auto s = writer->init();
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
+        writer.reset();
+        return s;
     }
-
     return Status::OK();
 }
 
-Status 
BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
 writer,
-                                                const FlushContext* flush_ctx) 
{
+Status BetaRowsetWriter::_check_segment_number_limit() {
     size_t total_segment_num = _num_segment - _segcompacted_point + 1 + 
_num_segcompacted;
     if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
         return Status::Error<TOO_MANY_SEGMENTS>(
@@ -864,43 +833,42 @@ Status 
BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::Segm
                 _context.tablet_id, _context.rowset_id.to_string(),
                 config::max_segment_num_per_rowset, _num_segment, 
_segcompacted_point,
                 _num_segcompacted);
-    } else {
-        return _do_create_segment_writer(writer, false, -1, -1, flush_ctx);
     }
+    return Status::OK();
 }
 
-Status 
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
 writer,
+Status 
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
 writer,
                                                int64_t* flush_size) {
-    uint32_t segid = (*writer)->get_segment_id();
-    uint32_t row_num = (*writer)->num_rows_written();
+    uint32_t segid = writer->get_segment_id();
+    uint32_t row_num = writer->num_rows_written();
 
-    if ((*writer)->num_rows_written() == 0) {
+    if (writer->num_rows_written() == 0) {
         return Status::OK();
     }
     uint64_t segment_size;
     uint64_t index_size;
-    Status s = (*writer)->finalize(&segment_size, &index_size);
+    Status s = writer->finalize(&segment_size, &index_size);
     if (!s.ok()) {
         return Status::Error(s.code(), "failed to finalize segment: {}", 
s.to_string());
     }
     VLOG_DEBUG << "tablet_id:" << _context.tablet_id
-               << " flushing filename: " << (*writer)->get_data_dir()->path()
+               << " flushing filename: " << writer->get_data_dir()->path()
                << " rowset_id:" << _context.rowset_id << " segment num:" << 
_num_segment;
 
     KeyBoundsPB key_bounds;
-    Slice min_key = (*writer)->min_encoded_key();
-    Slice max_key = (*writer)->max_encoded_key();
+    Slice min_key = writer->min_encoded_key();
+    Slice max_key = writer->max_encoded_key();
     DCHECK_LE(min_key.compare(max_key), 0);
     key_bounds.set_min_key(min_key.to_string());
     key_bounds.set_max_key(max_key.to_string());
 
     SegmentStatistics segstat;
     segstat.row_num = row_num;
-    segstat.data_size = segment_size + 
(*writer)->get_inverted_index_file_size();
-    segstat.index_size = index_size + 
(*writer)->get_inverted_index_file_size();
+    segstat.data_size = segment_size + writer->get_inverted_index_file_size();
+    segstat.index_size = index_size + writer->get_inverted_index_file_size();
     segstat.key_bounds = key_bounds;
 
-    writer->reset();
+    writer.reset();
     if (flush_size) {
         *flush_size = segment_size + index_size;
     }
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index e88efdc000..bb3e55d764 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -84,20 +84,18 @@ public:
 
     Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) 
override;
 
-    Status create_file_writer(uint32_t segment_id, io::FileWriterPtr* writer);
+    Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer);
 
     void add_segment(uint32_t segid, SegmentStatistics& segstat);
 
     Status flush() override;
 
-    Status unfold_variant_column_and_flush_block(
-            vectorized::Block* block, int32_t segment_id,
-            const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* 
flush_size) override;
+    Status flush_memtable(vectorized::Block* block, int32_t segment_id,
+                          int64_t* flush_size) override;
 
     // Return the file size flushed to disk in "flush_size"
     // This method is thread-safe.
-    Status flush_single_block(const vectorized::Block* block, int64_t* 
flush_size,
-                              const FlushContext* ctx = nullptr) override;
+    Status flush_single_block(const vectorized::Block* block) override;
 
     RowsetSharedPtr build() override;
 
@@ -129,31 +127,38 @@ public:
 
     Status wait_flying_segcompaction() override;
 
-    void set_segment_start_id(int32_t start_id) override { _segment_start_id = 
start_id; }
+    void set_segment_start_id(int32_t start_id) override {
+        _segment_start_id = start_id;
+        _next_segment_id = start_id;
+    }
 
     int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
 
     int64_t segment_writer_ns() override { return _segment_writer_ns; }
 
 private:
-    Status _do_add_block(const vectorized::Block* block,
-                         std::unique_ptr<segment_v2::SegmentWriter>* 
segment_writer,
-                         size_t row_offset, size_t input_row_num);
+    Status _add_rows(const vectorized::Block* block,
+                     std::unique_ptr<segment_v2::SegmentWriter>& 
segment_writer, size_t row_offset,
+                     size_t input_row_num);
     Status _add_block(const vectorized::Block* block,
-                      std::unique_ptr<segment_v2::SegmentWriter>* writer,
-                      const FlushContext* flush_ctx = nullptr);
-
-    Status _create_file_writer(std::string path, io::FileWriterPtr* 
file_writer);
-    Status _create_file_writer(uint32_t begin, uint32_t end, 
io::FileWriterPtr* writer);
-    Status 
_do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
-                                     bool is_segcompaction, int64_t begin, 
int64_t end,
-                                     const FlushContext* ctx = nullptr);
-    Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* 
writer,
-                                  const FlushContext* ctx = nullptr);
-    Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* 
writer,
+                      std::unique_ptr<segment_v2::SegmentWriter>& writer);
+
+    Status _create_file_writer(std::string path, io::FileWriterPtr& 
file_writer);
+    Status _check_segment_number_limit();
+    Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& 
writer,
+                                  int32_t segment_id, bool no_compression = 
false,
+                                  TabletSchemaSPtr flush_schema = nullptr);
+    Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& 
writer,
                                  int64_t* flush_size = nullptr);
+    Status _flush_single_block(const vectorized::Block* block, int32_t 
segment_id,
+                               int64_t* flush_size = nullptr,
+                               TabletSchemaSPtr flush_schema = nullptr);
     Status _generate_delete_bitmap(int32_t segment_id);
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
+
+    // segment compaction
+    Status _create_segment_writer_for_segcompaction(
+            std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end);
     Status _segcompaction_if_necessary();
     Status _segcompaction_ramaining_if_necessary();
     Status 
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index 45db4afe34..9a6ae89dd6 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -35,14 +35,6 @@ namespace doris {
 
 class MemTable;
 
-// Context for single memtable flush
-struct FlushContext {
-    ENABLE_FACTORY_CREATOR(FlushContext);
-    TabletSchemaSPtr flush_schema = nullptr;
-    const vectorized::Block* block = nullptr;
-    std::optional<int32_t> segment_id = std::nullopt;
-};
-
 class RowsetWriter {
 public:
     RowsetWriter() = default;
@@ -78,15 +70,13 @@ public:
                 "RowsetWriter not support final_flush");
     }
 
-    virtual Status unfold_variant_column_and_flush_block(
-            vectorized::Block* block, int32_t segment_id,
-            const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* 
flush_size) {
+    virtual Status flush_memtable(vectorized::Block* block, int32_t segment_id,
+                                  int64_t* flush_size) {
         return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
-                "RowsetWriter not support 
unfold_variant_column_and_flush_block");
+                "RowsetWriter not support flush_memtable");
     }
 
-    virtual Status flush_single_block(const vectorized::Block* block, int64_t* 
flush_size,
-                                      const FlushContext* ctx = nullptr) {
+    virtual Status flush_single_block(const vectorized::Block* block) {
         return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
                 "RowsetWriter not support flush_single_block");
     }
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index 234651a278..cac1e78fa6 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -187,7 +187,7 @@ Status 
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
 
 Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
         std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, 
uint64_t end) {
-    return _writer->_do_create_segment_writer(writer, true, begin, end);
+    return _writer->_create_segment_writer_for_segcompaction(writer, begin, 
end);
 }
 
 Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr 
segments) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9ad6096bfe..e53661dce8 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2977,8 +2977,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr 
rowset,
         RETURN_IF_ERROR(generate_new_block_for_partial_update(
                 rowset_schema, read_plan_ori, read_plan_update, 
rsid_to_rowset, &block));
         sort_block(block, ordered_block);
-        int64_t size;
-        RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block, 
&size));
+        RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
     }
     LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " 
rowset: " << rowset_id
               << " seg_id: " << seg->id() << " dummy_version: " << end_version 
+ 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to