This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new bcf476b2eb [peformance](load) cancel unstarted segcompaction tasks when build rowset (#22392) (#22395) bcf476b2eb is described below commit bcf476b2ebae7ddba3736934e3195fff67031dec Author: Kaijie Chen <c...@apache.org> AuthorDate: Fri Aug 11 17:30:08 2023 +0800 [peformance](load) cancel unstarted segcompaction tasks when build rowset (#22392) (#22395) --- be/src/olap/rowset/beta_rowset_writer.cpp | 55 ++++++++----------------------- be/src/olap/rowset/beta_rowset_writer.h | 3 +- be/src/olap/rowset/segcompaction.cpp | 7 +++- be/src/olap/rowset/segcompaction.h | 4 +++ 4 files changed, 25 insertions(+), 44 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index a12e81d625..85791cd01f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -324,23 +324,6 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u return Status::OK(); } -Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, - bool is_last) { - if (is_last) { - VLOG_DEBUG << "segcompaction last few segments"; - // currently we only rename remaining segments to reduce wait time - // so that transaction can be committed ASAP - RETURN_IF_ERROR(_load_noncompacted_segments(segments.get(), _num_segment)); - for (int i = 0; i < segments->size(); ++i) { - RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); - } - segments->clear(); - } else { - RETURN_IF_ERROR(_find_longest_consecutive_small_segment(segments)); - } - return Status::OK(); -} - bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock); if (!_is_doing_segcompaction) { @@ -363,7 +346,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { } else if ((_num_segment - _segcompacted_point) >= config::segcompaction_threshold_segment_num) { SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); - status = _get_segcompaction_candidates(segments, false); + status = _find_longest_consecutive_small_segment(segments); if (LIKELY(status.ok()) && (segments->size() > 0)) { LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment @@ -382,34 +365,28 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { return status; } -Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { - Status status = Status::OK(); +Status BetaRowsetWriter::_segcompaction_rename_last_segments() { DCHECK_EQ(_is_doing_segcompaction, false); if (!config::enable_segcompaction) { return Status::OK(); } if (_segcompaction_status.load() != OK) { return Status::Error<SEGCOMPACTION_FAILED>( - "BetaRowsetWriter::_segcompaction_ramaining_if_necessary meet invalid state"); + "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state"); } if (!_is_segcompacted() || _segcompacted_point == _num_segment) { // no need if never segcompact before or all segcompacted return Status::OK(); } - _is_doing_segcompaction = true; - SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); - status = _get_segcompaction_candidates(segments, true); - if (LIKELY(status.ok()) && (segments->size() > 0)) { - LOG(INFO) << "submit segcompaction remaining task, tablet_id:" << _context.tablet_id - << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment - << " segcompacted_point:" << _segcompacted_point; - status = StorageEngine::instance()->submit_seg_compaction_task(this, segments); - if (status.ok()) { - return status; - } + // currently we only rename remaining segments to reduce wait time + // so that transaction can be committed ASAP + VLOG_DEBUG << "segcompaction last few segments"; + SegCompactionCandidates segments; + RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment)); + for (int i = 0; i < segments.size(); ++i) { + RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); } - _is_doing_segcompaction = false; - return status; + return Status::OK(); } Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block, @@ -561,19 +538,15 @@ RowsetSharedPtr BetaRowsetWriter::build() { // if _segment_start_id is not zero, that means it's a transient rowset writer for // MoW partial update, don't need to do segment compaction. if (_segment_start_id == 0) { + _segcompaction_worker.cancel(); status = wait_flying_segcompaction(); - if (!status.ok()) { - LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status; - return nullptr; - } - status = _segcompaction_ramaining_if_necessary(); if (!status.ok()) { LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status; return nullptr; } - status = wait_flying_segcompaction(); + status = _segcompaction_rename_last_segments(); if (!status.ok()) { - LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status; + LOG(WARNING) << "rename last segments failed when build new rowset, res=" << status; return nullptr; } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index c7554cee72..29c67218c1 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -143,11 +143,10 @@ private: int64_t* flush_size = nullptr); void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta); Status _segcompaction_if_necessary(); - Status _segcompaction_ramaining_if_necessary(); + Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num); Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments); - Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, bool is_last); bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } bool _check_and_set_is_doing_segcompaction(); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index f8bf7f4dde..f967cb8696 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -293,7 +293,12 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { - Status status = _do_compact_segments(segments); + Status status = Status::OK(); + if (_cancelled) { + LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; + } else { + status = _do_compact_segments(segments); + } if (!status.ok()) { int16_t errcode = status.code(); switch (errcode) { diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 33fcf4ba28..e9484c317a 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -54,6 +54,9 @@ public: io::FileWriterPtr& get_file_writer() { return _file_writer; } + // set the cancel flag, tasks already started will not be cancelled. + void cancel() { _cancelled = true; } + private: Status _create_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end); @@ -74,5 +77,6 @@ private: //TODO(zhengyu): current impl depends heavily on the access to feilds of BetaRowsetWriter BetaRowsetWriter* _writer; io::FileWriterPtr _file_writer; + std::atomic<bool> _cancelled = false; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org