zhannngchen commented on code in PR #12866:
URL: https://github.com/apache/doris/pull/12866#discussion_r977657612


##########
be/src/olap/olap_server.cpp:
##########
@@ -700,6 +708,19 @@ Status 
StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
     return Status::OK();
 }
 
+Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
+                                             SegCompactionCandidatesSharedPtr 
segments) {
+    writer->do_segcompaction(segments);
+    return Status::OK();
+}
+
+Status StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer,
+                                                 
SegCompactionCandidatesSharedPtr segments) {
+    _seg_compaction_thread_pool->submit_func(

Review Comment:
   ditto



##########
be/src/olap/olap_server.cpp:
##########
@@ -700,6 +708,19 @@ Status 
StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
     return Status::OK();
 }
 
+Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
+                                             SegCompactionCandidatesSharedPtr 
segments) {
+    writer->do_segcompaction(segments);

Review Comment:
   should be `return writer->do_segcompaction(segments);` ?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();

Review Comment:
   should return here?



##########
be/src/common/config.h:
##########
@@ -875,6 +878,12 @@ CONF_Bool(enable_new_load_scan_node, "false");
 // Temp config. True to use new file scanner. Will remove after fully test.
 CONF_Bool(enable_new_file_scanner, "false");
 
+CONF_Bool(enable_segcompaction, "false"); // currently only support vectorized 
storage
+// Trigger segcompaction if the num of segments in a rowset exceeds this 
threshold.
+CONF_Int32(segcompaction_threshold_segment_num, "10");
+
+CONF_Int32(segcompaction_small_threshold, "1000000");

Review Comment:
    use 1048576 instead.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())

Review Comment:
   `if (writer != nullptr && writer->get_data_dir())`



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);

Review Comment:
   RETURN_IF_ERROR



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {

Review Comment:
   compare `seg_id` and `_num_segcompacted` is faster?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification

Review Comment:
   What's this comment mean?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);

Review Comment:
   We should not crash the whole process just due to some error happens during 
segment compaction



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());

Review Comment:
   ditto, we should also let caller know there's some thing wrong.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());

Review Comment:
   ditto, don't use assert easily.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    load_noncompacted_segments(&all_segments);
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << 
segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {

Review Comment:
   We'd better to add some comments for this strategy.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    load_noncompacted_segments(&all_segments);
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << 
segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                rename_compacted_segment_plain(_segcompacted_point);
+                ++_segcompacted_point;
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+            ++_segcompacted_point;
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= 
(config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        _segcompacted_point -= s;
+        segments->clear();
+        LOG(INFO) << "candidate segments num too small:" << s;

Review Comment:
   should not be INFO, it's too frequent, user will be confused to see lots of 
this log...



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    load_noncompacted_segments(&all_segments);
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << 
segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                rename_compacted_segment_plain(_segcompacted_point);
+                ++_segcompacted_point;
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+            ++_segcompacted_point;
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= 
(config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        _segcompacted_point -= s;
+        segments->clear();
+        LOG(INFO) << "candidate segments num too small:" << s;
+        return;
+    }
+    if (s == 1) { // poor bachelor, let it go
+        LOG(INFO) << "only one candidate segment";
+        rename_compacted_segment_plain(_segcompacted_point - 1);

Review Comment:
   handle error



##########
be/src/olap/rowset/beta_rowset_writer.h:
##########
@@ -73,42 +77,81 @@ class BetaRowsetWriter : public RowsetWriter {
         return Status::OK();
     }
 
+    Status do_segcompaction(SegCompactionCandidatesSharedPtr segments);
+
 private:
     template <typename RowType>
     Status _add_row(const RowType& row);
     Status _add_block(const vectorized::Block* block,
                       std::unique_ptr<segment_v2::SegmentWriter>* writer);
+    Status _add_block_for_segcompaction(const vectorized::Block* block,
+                                        
std::unique_ptr<segment_v2::SegmentWriter>* writer);
 
+    Status 
_do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
+                                     bool is_segcompaction, int64_t begin, 
int64_t end);
     Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* 
writer);
+    Status _create_segment_writer_for_segcompaction(
+            std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
begin, uint64_t end);
 
     Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* 
writer);
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
+    void segcompaction_if_necessary();
+    void segcompaction_ramaining_if_necessary();
+    vectorized::VMergeIterator* 
get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments,
+                                                         
std::shared_ptr<Schema> schema,
+                                                         OlapReaderStatistics* 
stat);
+    std::unique_ptr<segment_v2::SegmentWriter> 
create_segcompaction_writer(uint64_t begin,
+                                                                           
uint64_t end);
+    Status delete_original_segments(uint32_t begin, uint32_t end);
+    void rename_compacted_segments(int64_t begin, int64_t end);
+    void rename_compacted_segment_plain(uint64_t seg_id);
+    Status 
load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
+    void 
find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr 
segments);
+    SegCompactionCandidatesSharedPtr get_segcompaction_candidates(bool 
is_last);
+    void wait_flying_segcompaction();
+    bool is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
+    void clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end);
 
 private:
     RowsetWriterContext _context;
     std::shared_ptr<RowsetMeta> _rowset_meta;
 
     std::atomic<int32_t> _num_segment;
+    std::atomic<int32_t> _segcompacted_point; // segemnts before this point 
have
+                                              // already been segment compacted
+    std::atomic<int32_t> _num_segcompacted;   // index for segment compaction
     /// When flushing the memtable in the load process, we do not use this 
writer but an independent writer.
     /// Because we want to flush memtables in parallel.
     /// In other processes, such as merger or schema change, we will use this 
unified writer for data writing.
     std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
-    mutable SpinLock _lock; // lock to protect _wblocks.
-    std::vector<io::FileWriterPtr> _file_writers;
+    mutable SpinLock _lock;         // lock to protect _wblocks.
+    io::FileWriterPtr _file_writer; // writer for current active segment
+    io::FileWriterPtr _segcompaction_file_writer;
 
     // counters and statistics maintained during data write
     std::atomic<int64_t> _num_rows_written;
     std::atomic<int64_t> _total_data_size;
     std::atomic<int64_t> _total_index_size;
     // TODO rowset Zonemap
 
+    struct statistics {
+        int64_t row_num;
+        int64_t data_size;
+        int64_t index_size;
+        KeyBoundsPB key_bounds;
+    };
+    std::map<uint32_t, statistics> _segid_statistics_map;

Review Comment:
   map is not thread safe, we should hold a lock when writing it.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {

Review Comment:
   If this method failed, we should not allow the load process success? this 
method should return status as well?



##########
be/src/olap/rowset/beta_rowset_writer.h:
##########
@@ -73,42 +77,81 @@ class BetaRowsetWriter : public RowsetWriter {
         return Status::OK();
     }
 
+    Status do_segcompaction(SegCompactionCandidatesSharedPtr segments);
+
 private:
     template <typename RowType>
     Status _add_row(const RowType& row);
     Status _add_block(const vectorized::Block* block,
                       std::unique_ptr<segment_v2::SegmentWriter>* writer);
+    Status _add_block_for_segcompaction(const vectorized::Block* block,
+                                        
std::unique_ptr<segment_v2::SegmentWriter>* writer);
 
+    Status 
_do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
+                                     bool is_segcompaction, int64_t begin, 
int64_t end);
     Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* 
writer);
+    Status _create_segment_writer_for_segcompaction(
+            std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
begin, uint64_t end);
 
     Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* 
writer);
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
+    void segcompaction_if_necessary();

Review Comment:
   private method should start with _



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -309,12 +641,23 @@ Status BetaRowsetWriter::_create_segment_writer(
     DCHECK(file_writer != nullptr);
     segment_v2::SegmentWriterOptions writer_options;
     writer_options.enable_unique_key_merge_on_write = 
_context.enable_unique_key_merge_on_write;
-    writer->reset(new segment_v2::SegmentWriter(file_writer.get(), 
_num_segment,
-                                                _context.tablet_schema, 
_context.data_dir,
-                                                _context.max_rows_per_segment, 
writer_options));
-    {
-        std::lock_guard<SpinLock> l(_lock);
-        _file_writers.push_back(std::move(file_writer));
+
+    if (is_segcompaction) {
+        writer->reset(new segment_v2::SegmentWriter(file_writer.get(), 
_num_segcompacted + 1,

Review Comment:
   The only difference of these 2 branch is the parameter sgement_id?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);

Review Comment:
   the returned error status seems useless?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    load_noncompacted_segments(&all_segments);
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << 
segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                rename_compacted_segment_plain(_segcompacted_point);
+                ++_segcompacted_point;
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+            ++_segcompacted_point;

Review Comment:
   Is it better to update _segcompacted_point after the segment compaction task 
succeeded?
   If you update this variable and the task failed, you will have no chance to 
retry on the segments you just chosen. 



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    load_noncompacted_segments(&all_segments);
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << 
segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                rename_compacted_segment_plain(_segcompacted_point);
+                ++_segcompacted_point;
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+            ++_segcompacted_point;
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= 
(config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        _segcompacted_point -= s;
+        segments->clear();
+        LOG(INFO) << "candidate segments num too small:" << s;
+        return;
+    }
+    if (s == 1) { // poor bachelor, let it go
+        LOG(INFO) << "only one candidate segment";
+        rename_compacted_segment_plain(_segcompacted_point - 1);
+        segments->clear();
+        return;
+    }
+    std::stringstream ss;
+    for (auto& segment : (*segments.get())) {
+        ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() 
<< "]";
+    }
+    LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << 
ss.str();
+}
+
+SegCompactionCandidatesSharedPtr 
BetaRowsetWriter::get_segcompaction_candidates(bool is_last) {
+    SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+    if (is_last) {
+        load_noncompacted_segments(segments.get());
+        if (segments->size() == 1) {
+            LOG(INFO) << "only one last candidate segment";
+            rename_compacted_segment_plain(_segcompacted_point);
+            segments->clear();
+        }
+    } else {
+        find_longest_consecutive_small_segment(segments);
+    }
+    return segments;
+}
+
+void BetaRowsetWriter::segcompaction_if_necessary() {
+    if (!config::enable_segcompaction || 
!config::enable_storage_vectorization) {
+        return;
+    }
+    if (!_is_doing_segcompaction &&

Review Comment:
   Two threads can both see `_is_doing_segcompaction` as false, and enter the 
following block?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    load_noncompacted_segments(&all_segments);
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << 
segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                rename_compacted_segment_plain(_segcompacted_point);
+                ++_segcompacted_point;
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+            ++_segcompacted_point;
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= 
(config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        _segcompacted_point -= s;
+        segments->clear();
+        LOG(INFO) << "candidate segments num too small:" << s;
+        return;
+    }
+    if (s == 1) { // poor bachelor, let it go
+        LOG(INFO) << "only one candidate segment";
+        rename_compacted_segment_plain(_segcompacted_point - 1);
+        segments->clear();
+        return;
+    }
+    std::stringstream ss;
+    for (auto& segment : (*segments.get())) {
+        ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() 
<< "]";
+    }
+    LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << 
ss.str();
+}
+
+SegCompactionCandidatesSharedPtr 
BetaRowsetWriter::get_segcompaction_candidates(bool is_last) {
+    SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+    if (is_last) {

Review Comment:
   1. Should we check the `segcompaction_small_threshold` here as well?
   e.g. you compacted 10 segments, for some reason it works very slow, and 
after it finished, all load data have been written, generated 100 segments ,all 
of them are greater than 1MB, we should not compact them?
   
   2. In the previous case, if there left 100 segments, should we do segment 
compaction on all of them at once? 



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -219,17 +516,35 @@ Status BetaRowsetWriter::flush_single_memtable(const 
vectorized::Block* block) {
     if (block->rows() == 0) {
         return Status::OK();
     }
+    segcompaction_if_necessary();
     std::unique_ptr<segment_v2::SegmentWriter> writer;
     RETURN_NOT_OK(_create_segment_writer(&writer));
     RETURN_NOT_OK(_add_block(block, &writer));
     RETURN_NOT_OK(_flush_segment_writer(&writer));
     return Status::OK();
 }
 
+void BetaRowsetWriter::wait_flying_segcompaction() {
+    uint64_t begin_wait = GetCurrentTimeMicros();
+    while (_is_doing_segcompaction) { // TODO: memory barrier?
+        // change sync wait to async?
+        std::unique_lock<std::mutex> l(_segcompacting_cond_lock);

Review Comment:
   the lock is not used anywhere else, maybe thread token is more suitable?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    load_noncompacted_segments(&all_segments);
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << 
segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                rename_compacted_segment_plain(_segcompacted_point);
+                ++_segcompacted_point;
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+            ++_segcompacted_point;
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= 
(config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        _segcompacted_point -= s;
+        segments->clear();
+        LOG(INFO) << "candidate segments num too small:" << s;
+        return;
+    }
+    if (s == 1) { // poor bachelor, let it go
+        LOG(INFO) << "only one candidate segment";
+        rename_compacted_segment_plain(_segcompacted_point - 1);
+        segments->clear();
+        return;
+    }
+    std::stringstream ss;
+    for (auto& segment : (*segments.get())) {
+        ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() 
<< "]";
+    }
+    LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << 
ss.str();
+}
+
+SegCompactionCandidatesSharedPtr 
BetaRowsetWriter::get_segcompaction_candidates(bool is_last) {
+    SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+    if (is_last) {
+        load_noncompacted_segments(segments.get());
+        if (segments->size() == 1) {
+            LOG(INFO) << "only one last candidate segment";
+            rename_compacted_segment_plain(_segcompacted_point);
+            segments->clear();
+        }
+    } else {
+        find_longest_consecutive_small_segment(segments);
+    }
+    return segments;
+}
+
+void BetaRowsetWriter::segcompaction_if_necessary() {
+    if (!config::enable_segcompaction || 
!config::enable_storage_vectorization) {
+        return;
+    }
+    if (!_is_doing_segcompaction &&
+        ((_num_segment - _segcompacted_point) >= 
config::segcompaction_threshold_segment_num)) {
+        _is_doing_segcompaction = true;
+        SegCompactionCandidatesSharedPtr segments = 
get_segcompaction_candidates(false);
+        if (segments->size() > 0) {
+            LOG(INFO) << "submit segcompaction task, segment num:" << 
_num_segment
+                      << ", segcompacted_point:" << _segcompacted_point;
+            StorageEngine::instance()->submit_seg_compaction_task(this, 
segments);
+        } else {
+            _is_doing_segcompaction = false;
+        }
+    }
+}
+
+void BetaRowsetWriter::segcompaction_ramaining_if_necessary() {
+    if (!config::enable_segcompaction || 
!config::enable_storage_vectorization) {
+        return;
+    }
+    if (!is_segcompacted() || _segcompacted_point == _num_segment) {
+        // no need if never segcompact before or all segcompacted
+        return;
+    }
+    CHECK_EQ(_is_doing_segcompaction, false);

Review Comment:
   DCHECK_EQ



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);
+        CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                 true);
+        statistics org = _segid_statistics_map[seg_id + 1];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        clear_statistics_for_deleting_segments(seg_id, seg_id);
+        ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+        DCHECK_EQ(ret, 0);
+    }
+}
+
+void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, 
uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << 
begin + 1
+              << " to:" << end + 1;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i + 1);
+    }
+}
+
+Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr 
segments) {
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, 
schema, stat.get());
+    auto writer = create_segcompaction_writer(begin, end);
+    assert(writer != nullptr);
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader->next_batch(&block);
+        if (status != Status::OK()) {
+            assert(status.is_end_of_file());
+            break;
+        }
+        _add_block_for_segcompaction(&block, &writer);
+        block.clear_column_data();
+    }
+    clear_statistics_for_deleting_segments(begin, end);
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    delete_original_segments(begin, end);
+    rename_compacted_segments(begin, end);
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    CHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us";
+

Review Comment:
   we should also check correctness for segment compaction, refer to 
`Compaction::check_correctness`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to