zhannngchen commented on code in PR #12866:
URL: https://github.com/apache/doris/pull/12866#discussion_r977716456


##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
+        OlapReaderStatistics* stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, 
is_reverse, nullptr);
+    merge_itr->init(read_options);
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> 
BetaRowsetWriter::create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK()) {
+        writer = nullptr;
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << 
" end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+    }
+    if (writer->get_data_dir())
+        LOG(INFO) << "segcompaction segment writer created for begin:" << 
begin << " end:" << end
+                  << " path:" << writer->get_data_dir()->path();
+    return writer;
+}
+
+Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t 
end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been 
cleaned up
+        // will be cleaned up by the GC background. So here we only print the 
error
+        // message when we encounter an error.
+        WARN_IF_ERROR(fs->delete_file(seg_path),
+                      strings::Substitute("Failed to delete file=$0", 
seg_path));
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    
_context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    DCHECK_EQ(ret, 0);
+}
+
+// todo: will rename only do the job? maybe need deep modification
+void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) {
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, 
_context.rowset_id,
+                                                       _num_segcompacted++);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << 
" to "
+              << dst_seg_path;
+    if (src_seg_path.compare(dst_seg_path) != 0) {
+        CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == 
_segid_statistics_map.end(), false);

Review Comment:
   DCHECK_EQ



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to