morningman commented on code in PR #12866:
URL: https://github.com/apache/doris/pull/12866#discussion_r1011724897


##########
be/src/common/config.h:
##########
@@ -848,6 +851,11 @@ CONF_String(be_node_role, "mix");
 // Hide webserver page for safety.
 // Hide the be config page for webserver.
 CONF_Bool(hide_webserver_config_page, "false");
+CONF_Bool(enable_segcompaction, "false"); // currently only support vectorized 
storage
+// Trigger segcompaction if the num of segments in a rowset exceeds this 
threshold.
+CONF_Int32(segcompaction_threshold_segment_num, "10");
+
+CONF_Int32(segcompaction_small_threshold, "1048576");

Review Comment:
   And we need to modify document
   ```
   docs/zh-CN/docs/admin-manual/config/be-config.md
   docs/en/docs/admin-manual/config/be-config.md
   ```



##########
be/src/common/config.h:
##########
@@ -848,6 +851,11 @@ CONF_String(be_node_role, "mix");
 // Hide webserver page for safety.
 // Hide the be config page for webserver.
 CONF_Bool(hide_webserver_config_page, "false");
+CONF_Bool(enable_segcompaction, "false"); // currently only support vectorized 
storage
+// Trigger segcompaction if the num of segments in a rowset exceeds this 
threshold.
+CONF_Int32(segcompaction_threshold_segment_num, "10");
+
+CONF_Int32(segcompaction_small_threshold, "1048576");

Review Comment:
   Add comment for this config



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -107,6 +120,428 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        for (auto& itr : iterators) {
+            delete itr;
+        }
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path() << " status:" 
<< status;
+        return nullptr;
+    } else {
+        return writer;
+    }
+}
+
+Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
+                          strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t 
end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.rowset_dir,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id,
+                                                      _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        LOG(WARNING) << "failed to rename " << src_seg_path << " to " << 
dst_seg_path
+                     << ". ret:" << ret << " errno:" << errno;
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
+    if (seg_id == _num_segcompacted) {
+        return Status::OK();
+    }
+
+    int ret;
+    auto src_seg_path =
+            BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id,
+                                                      _num_segcompacted);
+    VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path 
<< " to "
+               << dst_seg_path;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        DCHECK_EQ(_segid_statistics_map.find(seg_id) == 
_segid_statistics_map.end(), false);
+        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == 
_segid_statistics_map.end(),
+                  true);
+        Statistics org = _segid_statistics_map[seg_id];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
+    }
+    ++_num_segcompacted;
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        LOG(WARNING) << "failed to rename " << src_seg_path << " to " << 
dst_seg_path
+                     << ". ret:" << ret << " errno:" << errno;
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t 
begin,
+                                                                      uint64_t 
end) {
+    VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << 
begin << " to:" << end;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i);
+    }
+}
+
+Status 
BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics> stat,
+                                            uint64_t merged_row_stat, uint64_t 
row_count,
+                                            uint64_t begin, uint64_t end) {
+    uint64_t stat_read_row = stat->raw_rows_read;
+    uint64_t sum_target_row = 0;
+
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (int i = begin; i <= end; ++i) {
+            sum_target_row += _segid_statistics_map[i].row_num;
+        }
+    }
+
+    if (sum_target_row != stat_read_row) {
+        LOG(WARNING) << "read row_num does not match. expect read row:" << 
sum_target_row
+                     << " actual read row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+
+    uint64_t total_row = row_count + merged_row_stat;
+    if (stat_read_row != total_row) {
+        LOG(WARNING) << "total row_num does not match. expect total row:" << 
total_row
+                     << " actual total row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr 
segments) {
+    SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
+                       ThreadContext::TaskType::COMPACTION);
+    // throttle segcompaction task if memory depleted.
+    if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
+        LOG(WARNING) << "skip segcompaction due to memory shortage";
+        return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
+    }
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << 
segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           
_context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    uint64_t merged_row_stat = 0;
+    vectorized::VMergeIterator* reader =
+            _get_segcompaction_reader(segments, schema, stat.get(), 
&merged_row_stat);
+    if (UNLIKELY(reader == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction reader";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER);
+    }
+    std::unique_ptr<vectorized::VMergeIterator> reader_ptr;
+    reader_ptr.reset(reader);
+    auto writer = _create_segcompaction_writer(begin, end);
+    if (UNLIKELY(writer == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction writer";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER);
+    }
+    uint64_t row_count = 0;
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader_ptr->next_batch(&block);
+        row_count += block.rows();
+        if (status != Status::OK()) {
+            if (LIKELY(status.is_end_of_file())) {
+                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, 
&writer),
+                                  "write block failed");
+                break;
+            } else {
+                LOG(WARNING) << "read block failed: " << status.to_string();
+                return status;
+            }
+        }
+        RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), 
"write block failed");
+        block.clear_column_data();
+    }
+    RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, 
row_count, begin, end),
+                      "check correctness failed");
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        _clear_statistics_for_deleting_segments_unsafe(begin, end);
+    }
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+
+    if (_segcompaction_file_writer != nullptr) {
+        _segcompaction_file_writer->close();
+    }
+
+    RETURN_NOT_OK(_delete_original_segments(begin, end));
+    RETURN_NOT_OK(_rename_compacted_segments(begin, end));
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : 
std::filesystem::directory_iterator(_context.rowset_dir)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " 
_num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    _segcompacted_point += (end - begin + 1);
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. 
elapsed time:" << elapsed
+              << "us. _segcompacted_point update:" << _segcompacted_point;
+
+    return Status::OK();
+}
+
+void BetaRowsetWriter::compact_segments(SegCompactionCandidatesSharedPtr 
segments) {
+    Status status = _do_compact_segments(segments);
+    if (!status.ok()) {
+        int16_t errcode = status.precise_code();
+        switch (errcode) {
+        case OLAP_ERR_FETCH_MEMORY_EXCEEDED:
+        case OLAP_ERR_SEGCOMPACTION_INIT_READER:
+        case OLAP_ERR_SEGCOMPACTION_INIT_WRITER:
+            LOG(WARNING) << "segcompaction failed, try next time:" << status;
+            return;
+        default:
+            LOG(WARNING) << "segcompaction fatal, terminating the write job:" 
<< status;
+            // status will be checked by the next trigger of segcompaction or 
the final wait
+            _segcompaction_status.store(OLAP_ERR_OTHER_ERROR);
+        }
+    }
+    DCHECK_EQ(_is_doing_segcompaction, true);
+    {
+        std::lock_guard lk(_is_doing_segcompaction_lock);
+        _is_doing_segcompaction = false;
+        _segcompacting_cond.notify_all();
+    }
+}
+
+Status BetaRowsetWriter::_load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::segment_cache_path(_context.rowset_dir, 
_context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+/* policy of segcompaction target selection:
+ *  1. skip big segments
+ *  2. if the consecutive smalls end up with a big, compact the smalls, except
+ *     single small
+ *  3. if the consecutive smalls end up with small, compact the smalls if the
+ *     length is beyond (config::segcompaction_threshold_segment_num / 2)
+ */
+Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    RETURN_NOT_OK(_load_noncompacted_segments(&all_segments));
+
+    std::stringstream ss_all;

Review Comment:
   do not use stringstream. use fmt instead.
   and use `VLOG_IS_ON` macro to wrap this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to