This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new bcf476b2eb [peformance](load) cancel unstarted segcompaction tasks 
when build rowset (#22392) (#22395)
bcf476b2eb is described below

commit bcf476b2ebae7ddba3736934e3195fff67031dec
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Fri Aug 11 17:30:08 2023 +0800

    [peformance](load) cancel unstarted segcompaction tasks when build rowset 
(#22392) (#22395)
---
 be/src/olap/rowset/beta_rowset_writer.cpp | 55 ++++++++-----------------------
 be/src/olap/rowset/beta_rowset_writer.h   |  3 +-
 be/src/olap/rowset/segcompaction.cpp      |  7 +++-
 be/src/olap/rowset/segcompaction.h        |  4 +++
 4 files changed, 25 insertions(+), 44 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index a12e81d625..85791cd01f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -324,23 +324,6 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t 
begin, int64_t end, u
     return Status::OK();
 }
 
-Status 
BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr&
 segments,
-                                                       bool is_last) {
-    if (is_last) {
-        VLOG_DEBUG << "segcompaction last few segments";
-        // currently we only rename remaining segments to reduce wait time
-        // so that transaction can be committed ASAP
-        RETURN_IF_ERROR(_load_noncompacted_segments(segments.get(), 
_num_segment));
-        for (int i = 0; i < segments->size(); ++i) {
-            
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
-        }
-        segments->clear();
-    } else {
-        RETURN_IF_ERROR(_find_longest_consecutive_small_segment(segments));
-    }
-    return Status::OK();
-}
-
 bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
     std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock);
     if (!_is_doing_segcompaction) {
@@ -363,7 +346,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
     } else if ((_num_segment - _segcompacted_point) >=
                config::segcompaction_threshold_segment_num) {
         SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
-        status = _get_segcompaction_candidates(segments, false);
+        status = _find_longest_consecutive_small_segment(segments);
         if (LIKELY(status.ok()) && (segments->size() > 0)) {
             LOG(INFO) << "submit segcompaction task, tablet_id:" << 
_context.tablet_id
                       << " rowset_id:" << _context.rowset_id << " segment 
num:" << _num_segment
@@ -382,34 +365,28 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
     return status;
 }
 
-Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
-    Status status = Status::OK();
+Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
     DCHECK_EQ(_is_doing_segcompaction, false);
     if (!config::enable_segcompaction) {
         return Status::OK();
     }
     if (_segcompaction_status.load() != OK) {
         return Status::Error<SEGCOMPACTION_FAILED>(
-                "BetaRowsetWriter::_segcompaction_ramaining_if_necessary meet 
invalid state");
+                "BetaRowsetWriter::_segcompaction_rename_last_segments meet 
invalid state");
     }
     if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
         // no need if never segcompact before or all segcompacted
         return Status::OK();
     }
-    _is_doing_segcompaction = true;
-    SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
-    status = _get_segcompaction_candidates(segments, true);
-    if (LIKELY(status.ok()) && (segments->size() > 0)) {
-        LOG(INFO) << "submit segcompaction remaining task, tablet_id:" << 
_context.tablet_id
-                  << " rowset_id:" << _context.rowset_id << " segment num:" << 
_num_segment
-                  << " segcompacted_point:" << _segcompacted_point;
-        status = StorageEngine::instance()->submit_seg_compaction_task(this, 
segments);
-        if (status.ok()) {
-            return status;
-        }
+    // currently we only rename remaining segments to reduce wait time
+    // so that transaction can be committed ASAP
+    VLOG_DEBUG << "segcompaction last few segments";
+    SegCompactionCandidates segments;
+    RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment));
+    for (int i = 0; i < segments.size(); ++i) {
+        
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
     }
-    _is_doing_segcompaction = false;
-    return status;
+    return Status::OK();
 }
 
 Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block,
@@ -561,19 +538,15 @@ RowsetSharedPtr BetaRowsetWriter::build() {
     // if _segment_start_id is not zero, that means it's a transient rowset 
writer for
     // MoW partial update, don't need to do segment compaction.
     if (_segment_start_id == 0) {
+        _segcompaction_worker.cancel();
         status = wait_flying_segcompaction();
-        if (!status.ok()) {
-            LOG(WARNING) << "segcompaction failed when build new rowset 1st 
wait, res=" << status;
-            return nullptr;
-        }
-        status = _segcompaction_ramaining_if_necessary();
         if (!status.ok()) {
             LOG(WARNING) << "segcompaction failed when build new rowset, res=" 
<< status;
             return nullptr;
         }
-        status = wait_flying_segcompaction();
+        status = _segcompaction_rename_last_segments();
         if (!status.ok()) {
-            LOG(WARNING) << "segcompaction failed when build new rowset 2nd 
wait, res=" << status;
+            LOG(WARNING) << "rename last segments failed when build new 
rowset, res=" << status;
             return nullptr;
         }
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index c7554cee72..29c67218c1 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -143,11 +143,10 @@ private:
                                  int64_t* flush_size = nullptr);
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
     Status _segcompaction_if_necessary();
-    Status _segcompaction_ramaining_if_necessary();
+    Status _segcompaction_rename_last_segments();
     Status 
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
                                        size_t num);
     Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr 
segments);
-    Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& 
segments, bool is_last);
     bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
 
     bool _check_and_set_is_doing_segcompaction();
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index f8bf7f4dde..f967cb8696 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -293,7 +293,12 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
 }
 
 void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr 
segments) {
-    Status status = _do_compact_segments(segments);
+    Status status = Status::OK();
+    if (_cancelled) {
+        LOG(INFO) << "segcompaction worker is cancelled, skipping 
segcompaction task";
+    } else {
+        status = _do_compact_segments(segments);
+    }
     if (!status.ok()) {
         int16_t errcode = status.code();
         switch (errcode) {
diff --git a/be/src/olap/rowset/segcompaction.h 
b/be/src/olap/rowset/segcompaction.h
index 33fcf4ba28..e9484c317a 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -54,6 +54,9 @@ public:
 
     io::FileWriterPtr& get_file_writer() { return _file_writer; }
 
+    // set the cancel flag, tasks already started will not be cancelled.
+    void cancel() { _cancelled = true; }
+
 private:
     Status _create_segment_writer_for_segcompaction(
             std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
begin, uint64_t end);
@@ -74,5 +77,6 @@ private:
     //TODO(zhengyu): current impl depends heavily on the access to feilds of 
BetaRowsetWriter
     BetaRowsetWriter* _writer;
     io::FileWriterPtr _file_writer;
+    std::atomic<bool> _cancelled = false;
 };
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to