This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new cc14daf0df [cherry-pick](compaction) safe exist #15021 (#15102)
cc14daf0df is described below

commit cc14daf0dfbe6645d964fab3492e50447b563d5d
Author: yixiutt <102007456+yixi...@users.noreply.github.com>
AuthorDate: Mon Dec 19 14:10:43 2022 +0800

    [cherry-pick](compaction) safe exist #15021 (#15102)
---
 be/src/olap/merger.cpp         | 12 ++++++++++--
 be/src/olap/storage_engine.cpp |  2 ++
 be/src/olap/storage_engine.h   |  3 +++
 3 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index ace3d6b39a..af19320c0b 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -67,7 +67,7 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
 
     // The following procedure would last for long time, half of one day, etc.
     int64_t output_rows = 0;
-    while (true) {
+    while (!StorageEngine::instance()->stopped()) {
         ObjectPool objectPool;
         bool eof = false;
         // Read one row into row_cursor
@@ -89,6 +89,10 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
         // so we should release memory immediately
         mem_pool->clear();
     }
+    if (StorageEngine::instance()->stopped()) {
+        LOG(INFO) << "tablet " << tablet->full_name() << "failed to do 
compaction, engine stopped";
+        return Status::InternalError("engine stopped");
+    }
 
     if (stats_output != nullptr) {
         stats_output->output_rows = output_rows;
@@ -155,7 +159,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
     vectorized::Block block = 
cur_tablet_schema->create_block(reader_params.return_columns);
     size_t output_rows = 0;
     bool eof = false;
-    while (!eof) {
+    while (!eof && !StorageEngine::instance()->stopped()) {
         // Read one block from block reader
         RETURN_NOT_OK_LOG(
                 reader.next_block_with_aggregation(&block, nullptr, nullptr, 
&eof),
@@ -174,6 +178,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
         output_rows += block.rows();
         block.clear_column_data();
     }
+    if (StorageEngine::instance()->stopped()) {
+        LOG(INFO) << "tablet " << tablet->full_name() << "failed to do 
compaction, engine stopped";
+        return Status::InternalError("engine stopped");
+    }
 
     if (stats_output != nullptr) {
         stats_output->output_rows = output_rows;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index e08b3d562e..e73a5ba8c1 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -106,6 +106,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _available_storage_medium_type_count(0),
           _effective_cluster_id(-1),
           _is_all_cluster_id_exist(true),
+          _stopped(false),
           _mem_tracker(std::make_shared<MemTracker>("StorageEngine")),
           
_segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")),
           
_segment_meta_mem_tracker(std::make_shared<MemTracker>("SegmentMeta")),
@@ -573,6 +574,7 @@ void StorageEngine::stop() {
     THREADS_JOIN(_path_gc_threads);
     THREADS_JOIN(_path_scan_threads);
 #undef THREADS_JOIN
+    _stopped = true;
 }
 
 void StorageEngine::_clear() {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6de80ead25..49ff072a59 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -169,6 +169,7 @@ public:
     Status start_trash_sweep(double* usage, bool ignore_guard = false);
 
     void stop();
+    bool stopped() { return _stopped; }
 
     void create_cumulative_compaction(TabletSharedPtr best_tablet,
                                       std::shared_ptr<CumulativeCompaction>& 
cumulative_compaction);
@@ -317,6 +318,8 @@ private:
     int32_t _effective_cluster_id;
     bool _is_all_cluster_id_exist;
 
+    bool _stopped;
+
     static StorageEngine* _s_instance;
 
     std::mutex _gc_mutex;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to