freemandealer commented on code in PR #14595: URL: https://github.com/apache/doris/pull/14595#discussion_r1032868944
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -552,6 +576,62 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { return status; } +Status BetaRowsetWriter::_segcompaction_finalpass_wait() { + Status status = Status::OK(); + DCHECK_EQ(_is_doing_segcompaction, false); + if (!config::enable_segcompaction || + !config::enable_storage_vectorization) { //TODO: final pass indep + return Status::OK(); + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED); + } + + int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment; + if (num_seg <= config::max_segment_num_per_rowset) { + LOG(INFO) << "num_seg less than max_segment_num_per_rowset, skip final pass: " << num_seg + << " vs " << config::max_segment_num_per_rowset; + return Status::OK(); + } + + // reset num counters for the final segcompaction + _num_segment = _num_segcompacted.load(); + _num_segcompacted = 0; + _segcompacted_point = 0; + + _total_row_num_written = 0; + { + std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); + for (const auto& itr : _segid_statistics_map) { + _total_row_num_written += itr.second.row_num; + } + } + + while (_segcompacted_point != _num_segment) { + _is_doing_segcompaction = true; + SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + status = _get_segcompaction_candidates(segments, FINAL_SEGCOMPACTION_TYPE); + if (LIKELY(status.ok()) && (segments->size() > 0)) { + LOG(INFO) << "submit segcompaction final task, tablet_id:" << _context.tablet_id + << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment + << " segcompacted_point:" << _segcompacted_point; + status = StorageEngine::instance()->submit_seg_compaction_task(this, segments); + if (!status.ok()) { + _is_doing_segcompaction = false; + return status; + } + status = _wait_flying_segcompaction(); + if (!status.ok()) { + LOG(WARNING) << "segcompaction failed when build new rowset final wait, res=" + << status; + return status; + } + } Review Comment: is_doing_segcompaction = false; ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -552,6 +576,62 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { return status; } +Status BetaRowsetWriter::_segcompaction_finalpass_wait() { + Status status = Status::OK(); + DCHECK_EQ(_is_doing_segcompaction, false); + if (!config::enable_segcompaction || + !config::enable_storage_vectorization) { //TODO: final pass indep + return Status::OK(); + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED); + } + + int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment; + if (num_seg <= config::max_segment_num_per_rowset) { + LOG(INFO) << "num_seg less than max_segment_num_per_rowset, skip final pass: " << num_seg + << " vs " << config::max_segment_num_per_rowset; + return Status::OK(); + } + + // reset num counters for the final segcompaction + _num_segment = _num_segcompacted.load(); + _num_segcompacted = 0; + _segcompacted_point = 0; + + _total_row_num_written = 0; + { + std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); + for (const auto& itr : _segid_statistics_map) { + _total_row_num_written += itr.second.row_num; + } + } + + while (_segcompacted_point != _num_segment) { + _is_doing_segcompaction = true; + SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + status = _get_segcompaction_candidates(segments, FINAL_SEGCOMPACTION_TYPE); + if (LIKELY(status.ok()) && (segments->size() > 0)) { + LOG(INFO) << "submit segcompaction final task, tablet_id:" << _context.tablet_id + << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment + << " segcompacted_point:" << _segcompacted_point; + status = StorageEngine::instance()->submit_seg_compaction_task(this, segments); + if (!status.ok()) { + _is_doing_segcompaction = false; + return status; Review Comment: continue&retry instead of break&fail ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -552,6 +576,62 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { return status; } +Status BetaRowsetWriter::_segcompaction_finalpass_wait() { + Status status = Status::OK(); + DCHECK_EQ(_is_doing_segcompaction, false); + if (!config::enable_segcompaction || + !config::enable_storage_vectorization) { //TODO: final pass indep + return Status::OK(); + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED); + } + + int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment; + if (num_seg <= config::max_segment_num_per_rowset) { + LOG(INFO) << "num_seg less than max_segment_num_per_rowset, skip final pass: " << num_seg + << " vs " << config::max_segment_num_per_rowset; + return Status::OK(); + } + + // reset num counters for the final segcompaction + _num_segment = _num_segcompacted.load(); + _num_segcompacted = 0; + _segcompacted_point = 0; + + _total_row_num_written = 0; + { + std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); + for (const auto& itr : _segid_statistics_map) { + _total_row_num_written += itr.second.row_num; + } + } + + while (_segcompacted_point != _num_segment) { + _is_doing_segcompaction = true; + SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + status = _get_segcompaction_candidates(segments, FINAL_SEGCOMPACTION_TYPE); + if (LIKELY(status.ok()) && (segments->size() > 0)) { + LOG(INFO) << "submit segcompaction final task, tablet_id:" << _context.tablet_id + << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment + << " segcompacted_point:" << _segcompacted_point; + status = StorageEngine::instance()->submit_seg_compaction_task(this, segments); + if (!status.ok()) { + _is_doing_segcompaction = false; + return status; + } + status = _wait_flying_segcompaction(); Review Comment: return if not ok, dup warning log ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -552,6 +576,62 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { return status; } +Status BetaRowsetWriter::_segcompaction_finalpass_wait() { + Status status = Status::OK(); + DCHECK_EQ(_is_doing_segcompaction, false); + if (!config::enable_segcompaction || Review Comment: indep ########## be/src/olap/rowset/beta_rowset_writer.h: ########## @@ -165,13 +174,19 @@ class BetaRowsetWriter : public RowsetWriter { // ensure only one inflight segcompaction task for each rowset std::atomic<bool> _is_doing_segcompaction; + // enforce compare-and-swap on _is_doing_segcompaction +#if 0 Review Comment: rm useless code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org