This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new cc14daf0df [cherry-pick](compaction) safe exist #15021 (#15102) cc14daf0df is described below commit cc14daf0dfbe6645d964fab3492e50447b563d5d Author: yixiutt <102007456+yixi...@users.noreply.github.com> AuthorDate: Mon Dec 19 14:10:43 2022 +0800 [cherry-pick](compaction) safe exist #15021 (#15102) --- be/src/olap/merger.cpp | 12 ++++++++++-- be/src/olap/storage_engine.cpp | 2 ++ be/src/olap/storage_engine.h | 3 +++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index ace3d6b39a..af19320c0b 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -67,7 +67,7 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, // The following procedure would last for long time, half of one day, etc. int64_t output_rows = 0; - while (true) { + while (!StorageEngine::instance()->stopped()) { ObjectPool objectPool; bool eof = false; // Read one row into row_cursor @@ -89,6 +89,10 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, // so we should release memory immediately mem_pool->clear(); } + if (StorageEngine::instance()->stopped()) { + LOG(INFO) << "tablet " << tablet->full_name() << "failed to do compaction, engine stopped"; + return Status::InternalError("engine stopped"); + } if (stats_output != nullptr) { stats_output->output_rows = output_rows; @@ -155,7 +159,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, vectorized::Block block = cur_tablet_schema->create_block(reader_params.return_columns); size_t output_rows = 0; bool eof = false; - while (!eof) { + while (!eof && !StorageEngine::instance()->stopped()) { // Read one block from block reader RETURN_NOT_OK_LOG( reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof), @@ -174,6 +178,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, output_rows += block.rows(); block.clear_column_data(); } + if (StorageEngine::instance()->stopped()) { + LOG(INFO) << "tablet " << tablet->full_name() << "failed to do compaction, engine stopped"; + return Status::InternalError("engine stopped"); + } if (stats_output != nullptr) { stats_output->output_rows = output_rows; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index e08b3d562e..e73a5ba8c1 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -106,6 +106,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _available_storage_medium_type_count(0), _effective_cluster_id(-1), _is_all_cluster_id_exist(true), + _stopped(false), _mem_tracker(std::make_shared<MemTracker>("StorageEngine")), _segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")), _segment_meta_mem_tracker(std::make_shared<MemTracker>("SegmentMeta")), @@ -573,6 +574,7 @@ void StorageEngine::stop() { THREADS_JOIN(_path_gc_threads); THREADS_JOIN(_path_scan_threads); #undef THREADS_JOIN + _stopped = true; } void StorageEngine::_clear() { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 6de80ead25..49ff072a59 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -169,6 +169,7 @@ public: Status start_trash_sweep(double* usage, bool ignore_guard = false); void stop(); + bool stopped() { return _stopped; } void create_cumulative_compaction(TabletSharedPtr best_tablet, std::shared_ptr<CumulativeCompaction>& cumulative_compaction); @@ -317,6 +318,8 @@ private: int32_t _effective_cluster_id; bool _is_all_cluster_id_exist; + bool _stopped; + static StorageEngine* _s_instance; std::mutex _gc_mutex; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org