freemandealer commented on code in PR #14595:
URL: https://github.com/apache/doris/pull/14595#discussion_r1032868944


##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -552,6 +576,62 @@ Status 
BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
     return status;
 }
 
+Status BetaRowsetWriter::_segcompaction_finalpass_wait() {
+    Status status = Status::OK();
+    DCHECK_EQ(_is_doing_segcompaction, false);
+    if (!config::enable_segcompaction ||
+        !config::enable_storage_vectorization) { //TODO: final pass indep
+        return Status::OK();
+    }
+    if (_segcompaction_status.load() != OLAP_SUCCESS) {
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED);
+    }
+
+    int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
+    if (num_seg <= config::max_segment_num_per_rowset) {
+        LOG(INFO) << "num_seg less than max_segment_num_per_rowset, skip final 
pass: " << num_seg
+                  << " vs " << config::max_segment_num_per_rowset;
+        return Status::OK();
+    }
+
+    // reset num counters for the final segcompaction
+    _num_segment = _num_segcompacted.load();
+    _num_segcompacted = 0;
+    _segcompacted_point = 0;
+
+    _total_row_num_written = 0;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (const auto& itr : _segid_statistics_map) {
+            _total_row_num_written += itr.second.row_num;
+        }
+    }
+
+    while (_segcompacted_point != _num_segment) {
+        _is_doing_segcompaction = true;
+        SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+        status = _get_segcompaction_candidates(segments, 
FINAL_SEGCOMPACTION_TYPE);
+        if (LIKELY(status.ok()) && (segments->size() > 0)) {
+            LOG(INFO) << "submit segcompaction final task, tablet_id:" << 
_context.tablet_id
+                      << " rowset_id:" << _context.rowset_id << " segment 
num:" << _num_segment
+                      << " segcompacted_point:" << _segcompacted_point;
+            status = 
StorageEngine::instance()->submit_seg_compaction_task(this, segments);
+            if (!status.ok()) {
+                _is_doing_segcompaction = false;
+                return status;
+            }
+            status = _wait_flying_segcompaction();
+            if (!status.ok()) {
+                LOG(WARNING) << "segcompaction failed when build new rowset 
final wait, res="
+                             << status;
+                return status;
+            }
+        }

Review Comment:
   is_doing_segcompaction = false;



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -552,6 +576,62 @@ Status 
BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
     return status;
 }
 
+Status BetaRowsetWriter::_segcompaction_finalpass_wait() {
+    Status status = Status::OK();
+    DCHECK_EQ(_is_doing_segcompaction, false);
+    if (!config::enable_segcompaction ||
+        !config::enable_storage_vectorization) { //TODO: final pass indep
+        return Status::OK();
+    }
+    if (_segcompaction_status.load() != OLAP_SUCCESS) {
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED);
+    }
+
+    int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
+    if (num_seg <= config::max_segment_num_per_rowset) {
+        LOG(INFO) << "num_seg less than max_segment_num_per_rowset, skip final 
pass: " << num_seg
+                  << " vs " << config::max_segment_num_per_rowset;
+        return Status::OK();
+    }
+
+    // reset num counters for the final segcompaction
+    _num_segment = _num_segcompacted.load();
+    _num_segcompacted = 0;
+    _segcompacted_point = 0;
+
+    _total_row_num_written = 0;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (const auto& itr : _segid_statistics_map) {
+            _total_row_num_written += itr.second.row_num;
+        }
+    }
+
+    while (_segcompacted_point != _num_segment) {
+        _is_doing_segcompaction = true;
+        SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+        status = _get_segcompaction_candidates(segments, 
FINAL_SEGCOMPACTION_TYPE);
+        if (LIKELY(status.ok()) && (segments->size() > 0)) {
+            LOG(INFO) << "submit segcompaction final task, tablet_id:" << 
_context.tablet_id
+                      << " rowset_id:" << _context.rowset_id << " segment 
num:" << _num_segment
+                      << " segcompacted_point:" << _segcompacted_point;
+            status = 
StorageEngine::instance()->submit_seg_compaction_task(this, segments);
+            if (!status.ok()) {
+                _is_doing_segcompaction = false;
+                return status;

Review Comment:
   continue&retry instead of break&fail



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -552,6 +576,62 @@ Status 
BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
     return status;
 }
 
+Status BetaRowsetWriter::_segcompaction_finalpass_wait() {
+    Status status = Status::OK();
+    DCHECK_EQ(_is_doing_segcompaction, false);
+    if (!config::enable_segcompaction ||
+        !config::enable_storage_vectorization) { //TODO: final pass indep
+        return Status::OK();
+    }
+    if (_segcompaction_status.load() != OLAP_SUCCESS) {
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED);
+    }
+
+    int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
+    if (num_seg <= config::max_segment_num_per_rowset) {
+        LOG(INFO) << "num_seg less than max_segment_num_per_rowset, skip final 
pass: " << num_seg
+                  << " vs " << config::max_segment_num_per_rowset;
+        return Status::OK();
+    }
+
+    // reset num counters for the final segcompaction
+    _num_segment = _num_segcompacted.load();
+    _num_segcompacted = 0;
+    _segcompacted_point = 0;
+
+    _total_row_num_written = 0;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (const auto& itr : _segid_statistics_map) {
+            _total_row_num_written += itr.second.row_num;
+        }
+    }
+
+    while (_segcompacted_point != _num_segment) {
+        _is_doing_segcompaction = true;
+        SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+        status = _get_segcompaction_candidates(segments, 
FINAL_SEGCOMPACTION_TYPE);
+        if (LIKELY(status.ok()) && (segments->size() > 0)) {
+            LOG(INFO) << "submit segcompaction final task, tablet_id:" << 
_context.tablet_id
+                      << " rowset_id:" << _context.rowset_id << " segment 
num:" << _num_segment
+                      << " segcompacted_point:" << _segcompacted_point;
+            status = 
StorageEngine::instance()->submit_seg_compaction_task(this, segments);
+            if (!status.ok()) {
+                _is_doing_segcompaction = false;
+                return status;
+            }
+            status = _wait_flying_segcompaction();

Review Comment:
   return if not ok, dup warning log



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -552,6 +576,62 @@ Status 
BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
     return status;
 }
 
+Status BetaRowsetWriter::_segcompaction_finalpass_wait() {
+    Status status = Status::OK();
+    DCHECK_EQ(_is_doing_segcompaction, false);
+    if (!config::enable_segcompaction ||

Review Comment:
   indep



##########
be/src/olap/rowset/beta_rowset_writer.h:
##########
@@ -165,13 +174,19 @@ class BetaRowsetWriter : public RowsetWriter {
 
     // ensure only one inflight segcompaction task for each rowset
     std::atomic<bool> _is_doing_segcompaction;
+
     // enforce compare-and-swap on _is_doing_segcompaction
+#if 0

Review Comment:
   rm useless code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to